Оптимизация массового парсинга: сочетание aiohttp и многопоточности

Мне необходимо обработать значительный объем данных - примерно 15 000 страниц, каждая из которых содержит до 35 статей, распределенных по 15 рубрикам. При использовании синхронного подхода через библиотеку requests в одном потоке выполнение задачи займет несколько часов, что неприемлемо, так как информация требуется в кратчайшие сроки.

Для решения этой проблемы я планирую использовать комбинацию асинхронных запросов через aiohttp и многопоточную обработку. Идея заключается в следующем:

  • Каждый из потоков будет параллельно обрабатывать тысячи страниц, относящихся к определенной рубрике.
  • Внутри каждого потока запросы к страницам будут выполняться асинхронно с помощью aiohttp.

Однако возникает техническая сложность: стандартный ThreadPoolExecutor не позволяет напрямую передавать в метод submit асинхронные функции (с ключевым словом await). Если же вызывать такую функцию напрямую, многопоточность работать не будет - потоки будут выполняться последовательно, что сводит на нет весь замысел.

Вопрос: как правильно реализовать эту архитектуру? Ниже представлен текущий вариант кода.

Импорт библиотек:

  • import bs4
  • import aiohttp, asyncio
  • from urllib import parse
  • import certifi, ssl
  • from concurrent.futures import ThreadPoolExecutor

Импорт модулей проекта:

  • from utils import to_dict, fetch

Глобальные переменные:

  • items_list = []
  • SSL_CERT = ssl.create_default_context(cafile=certifi.where())

Асинхронная функция обработки рубрики:

async def serialize_topic(topic: str, topics: list[str]) -> None:
print(f'Starting to serialize {topics.index(topic)+1}. {topic}')
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})
topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
# Определение номера последней страницы для пагинации
pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = parse.urlparse(max_page_url).query
dict_from_query = parse.parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
# Обработка всех страниц
for i in range(max_page):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
# Обработка статей на странице
for item in items:
# Получение заголовка и данных статьи
...
print(f'Serialized page number {i+1}')
print(f'Serialized {topics.index(topic)+1}. {topic}')

Основная асинхронная функция:

async def main():
# Получение списка всех рубрик
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
# Попытка использования ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=15) as executor:
for topic in topics:
executor.submit(await serialize_topic(), topic, topics)
print('\n**************************\n')
print(f'Total amount of articles: {len(items_list)}')

Точка входа:

if __name__ == '__main__':
asyncio.run(main())