Оптимизация массового парсинга: сочетание aiohttp и многопоточности

    Мне необходимо обработать значительный объем данных - примерно 15 000 страниц, каждая из которых содержит до 35 статей, распределенных по 15 рубрикам. При использовании синхронного подхода через библиотеку requests в одном потоке выполнение задачи займет несколько часов, что неприемлемо, так как информация требуется в кратчайшие сроки.

    Для решения этой проблемы я планирую использовать комбинацию асинхронных запросов через aiohttp и многопоточную обработку. Идея заключается в следующем:

    • Каждый из потоков будет параллельно обрабатывать тысячи страниц, относящихся к определенной рубрике.
    • Внутри каждого потока запросы к страницам будут выполняться асинхронно с помощью aiohttp.

    Однако возникает техническая сложность: стандартный ThreadPoolExecutor не позволяет напрямую передавать в метод submit асинхронные функции (с ключевым словом await). Если же вызывать такую функцию напрямую, многопоточность работать не будет - потоки будут выполняться последовательно, что сводит на нет весь замысел.

    Вопрос: как правильно реализовать эту архитектуру? Ниже представлен текущий вариант кода.

    Импорт библиотек:

    • import bs4
    • import aiohttp, asyncio
    • from urllib import parse
    • import certifi, ssl
    • from concurrent.futures import ThreadPoolExecutor

    Импорт модулей проекта:

    • from utils import to_dict, fetch

    Глобальные переменные:

    • items_list = []
    • SSL_CERT = ssl.create_default_context(cafile=certifi.where())

    Асинхронная функция обработки рубрики:

    async def serialize_topic(topic: str, topics: list[str]) -> None:
    print(f'Starting to serialize {topics.index(topic)+1}. {topic}')
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
    topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})
    topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
    # Определение номера последней страницы для пагинации
    pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
    max_page_url = pagination[-1].a.get('href')
    max_page_params = parse.urlparse(max_page_url).query
    dict_from_query = parse.parse_qs(max_page_params)
    max_page = int(dict_from_query['page'][0])
    # Обработка всех страниц
    for i in range(max_page):
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
    page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})
    page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
    items = page_soup.find_all(class_='article')
    # Обработка статей на странице
    for item in items:
    # Получение заголовка и данных статьи
    ...
    print(f'Serialized page number {i+1}')
    print(f'Serialized {topics.index(topic)+1}. {topic}')

    Основная асинхронная функция:

    async def main():
    # Получение списка всех рубрик
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
    html = await fetch(func=session.get, url='')
    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]
    # Попытка использования ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=15) as executor:
    for topic in topics:
    executor.submit(await serialize_topic(), topic, topics)
    print('\n**************************\n')
    print(f'Total amount of articles: {len(items_list)}')

    Точка входа:

    if __name__ == '__main__':
    asyncio.run(main())