Управление сессиями SQLAlchemy в долгих задачах с Dishka

    При построении архитектуры приложения на Python с использованием асинхронного фреймворка и библиотеки Dishka для внедрения зависимостей часто возникает проблема: объект сессии SQLAlchemy «протухает» при выполнении длительных фоновых задач. Типичная цепочка зависимостей - сервис → репозиторий → сессия → фабрика сессий → движок - отлично работает для веб-запросов, где сессия живёт миллисекунды. Но если задача выполняется несколько минут, ошибки обращения к БД становятся неизбежными. Разберём, как правильно организовать работу с базой данных в таких сценариях, сохраняя принципы DI и чистоту кода.

    Почему долгая сессия - проблема

    Сессия SQLAlchemy - это не просто соединение с БД, а контекст, который отслеживает изменения объектов (dirty checking) и управляет транзакциями. Если держать сессию открытой минутами, растёт риск:

    • Конфликтов блокировок - долгая транзакция может блокировать строки таблиц.
    • Устаревших данных - кеш сессии расходится с реальным состоянием БД.
    • Тайм-аутов соединения - пул соединений может закрыть неактивное соединение.

    Поэтому для длительных задач (например, синхронизация товаров с WB) нужно управлять временем жизни сессии.

    Три подхода к решению

    1. Дробление задачи на подзадачи

    Разделите одну большую задачу на три: старт job (короткая сессия), долгая операция без БД, финализация с записью в БД (новая короткая сессия). Плюс - не требует переписывания существующего кода. Минус - дополнительные накладные расходы на сериализацию/десериализацию данных в очереди и риск ошибок при передаче больших объёмов.

    2. Передача фабрики сессий в репозиторий

    Вместо объекта сессии передавайте в конструктор репозитория async_sessionmaker. Тогда каждый метод репозитория создаёт новую сессию, выполняет запрос и закрывает её. Плюс - сессии живут очень коротко. Минус - управление транзакциями (commit/rollback) приходится выносить в методы репозитория, что нарушает чистоту слоя.

    3. Передача фабрики сессий в сервис

    Сервис получает async_sessionmaker и внутри методов создаёт сессию, а затем передаёт её в конструктор репозитория. Это сохраняет контроль над транзакцией на уровне сервиса и позволяет легко выполнять rollback при ошибках. Минус - репозиторий создаётся внутри метода сервиса, что формально нарушает принцип Dependency Injection.

    Рекомендация: комбинированный подход с UnitOfWork

    Лучший способ - внедрить паттерн Unit of Work. Он инкапсулирует управление сессией и транзакциями в отдельном компоненте. Пример реализации с Dishka:

    class UnitOfWork:
        def __init__(self, session_factory: async_sessionmaker[AsyncSession]):
            self._session_factory = session_factory
            self.session: AsyncSession | None = None
    
        async def __aenter__(self):
            self.session = self._session_factory()
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if exc_type:
                await self.session.rollback()
            else:
                await self.session.commit()
            await self.session.close()

    Теперь сервис получает UnitOfWork и использует его как контекстный менеджер внутри длительной задачи. Сессия создаётся только на время обращения к БД, а транзакция контролируется явно. Dishka может предоставлять UnitOfWork со скоупом REQUEST - для коротких задач, или создавать новый экземпляр прямо в коде задачи.

    Как адаптировать Dishka для длительных задач

    В вашем провайдере замените предоставление сессии на предоставление фабрики сессий и UnitOfWork:

    class ProviderDatabase(Provider):
        @provide(scope=Scope.APP)
        def provide_async_engine(self, config: DatabaseConfig) -> AsyncEngine:
            return create_async_engine(url=config.url, echo=config.sqlalchemy_echo)
    
        @provide(scope=Scope.APP)
        def provide_async_session_maker(
            self, engine: AsyncEngine
        ) -> async_sessionmaker[AsyncSession]:
            return async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    
        @provide(scope=Scope.REQUEST)
        def provide_uow(self, session_maker: async_sessionmaker[AsyncSession]) -> UnitOfWork:
            return UnitOfWork(session_maker)

    Внутри долгой задачи вы сами управляете временем жизни UnitOfWork:

    @broker.task
    async def long_task(uow_factory: FromDishka[UnitOfWork]):
        # Долгая логика без БД
        ...
        # Короткое обращение к БД
        async with uow_factory as uow:
            repo = JobRepository(uow.session)
            await repo.update_status(...)
        # Снова долгая логика
        ...

    Таким образом, сессия живёт только внутри блока async with, а DI не нарушается - фабрика UnitOfWork получена через Dishka.

    Заключение

    Проблема «протухания» сессии в долгих задачах решается не дроблением на микрозадачи и не отказом от DI, а внедрением паттерна UnitOfWork. Dishka отлично подходит для предоставления фабрики сессий и UnitOfWork, а код остаётся чистым и тестируемым. Используйте этот подход в своих проектах на Taskiq, Celery или других очередях - и забудьте об ошибках соединения.

    Часто задаваемые вопросы