Оптимизация массового парсинга: сочетание aiohttp и многопоточности
Мне необходимо обработать значительный объем данных - примерно 15 000 страниц, каждая из которых содержит до 35 статей, распределенных по 15 рубрикам. При использовании синхронного подхода через библиотеку requests в одном потоке выполнение задачи займет несколько часов, что неприемлемо, так как информация требуется в кратчайшие сроки.
Для решения этой проблемы я планирую использовать комбинацию асинхронных запросов через aiohttp и многопоточную обработку. Идея заключается в следующем:
- Каждый из потоков будет параллельно обрабатывать тысячи страниц, относящихся к определенной рубрике.
- Внутри каждого потока запросы к страницам будут выполняться асинхронно с помощью aiohttp.
Однако возникает техническая сложность: стандартный ThreadPoolExecutor не позволяет напрямую передавать в метод submit асинхронные функции (с ключевым словом await). Если же вызывать такую функцию напрямую, многопоточность работать не будет - потоки будут выполняться последовательно, что сводит на нет весь замысел.
Вопрос: как правильно реализовать эту архитектуру? Ниже представлен текущий вариант кода.
Импорт библиотек:
- import bs4
- import aiohttp, asyncio
- from urllib import parse
- import certifi, ssl
- from concurrent.futures import ThreadPoolExecutor
Импорт модулей проекта:
- from utils import to_dict, fetch
Глобальные переменные:
- items_list = []
- SSL_CERT = ssl.create_default_context(cafile=certifi.where())
Асинхронная функция обработки рубрики:
async def serialize_topic(topic: str, topics: list[str]) -> None:
print(f'Starting to serialize {topics.index(topic)+1}. {topic}')
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})
topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
# Определение номера последней страницы для пагинации
pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = parse.urlparse(max_page_url).query
dict_from_query = parse.parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
# Обработка всех страниц
for i in range(max_page):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
# Обработка статей на странице
for item in items:
# Получение заголовка и данных статьи
...
print(f'Serialized page number {i+1}')
print(f'Serialized {topics.index(topic)+1}. {topic}')
Основная асинхронная функция:
async def main():
# Получение списка всех рубрик
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
# Попытка использования ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=15) as executor:
for topic in topics:
executor.submit(await serialize_topic(), topic, topics)
print('\n**************************\n')
print(f'Total amount of articles: {len(items_list)}')
Точка входа:
if __name__ == '__main__':
asyncio.run(main())