Управление сессиями SQLAlchemy в долгих задачах с Dishka
При построении архитектуры приложения на Python с использованием асинхронного фреймворка и библиотеки Dishka для внедрения зависимостей часто возникает проблема: объект сессии SQLAlchemy «протухает» при выполнении длительных фоновых задач. Типичная цепочка зависимостей - сервис → репозиторий → сессия → фабрика сессий → движок - отлично работает для веб-запросов, где сессия живёт миллисекунды. Но если задача выполняется несколько минут, ошибки обращения к БД становятся неизбежными. Разберём, как правильно организовать работу с базой данных в таких сценариях, сохраняя принципы DI и чистоту кода.
Почему долгая сессия - проблема
Сессия SQLAlchemy - это не просто соединение с БД, а контекст, который отслеживает изменения объектов (dirty checking) и управляет транзакциями. Если держать сессию открытой минутами, растёт риск:
- Конфликтов блокировок - долгая транзакция может блокировать строки таблиц.
- Устаревших данных - кеш сессии расходится с реальным состоянием БД.
- Тайм-аутов соединения - пул соединений может закрыть неактивное соединение.
Поэтому для длительных задач (например, синхронизация товаров с WB) нужно управлять временем жизни сессии.
Три подхода к решению
1. Дробление задачи на подзадачи
Разделите одну большую задачу на три: старт job (короткая сессия), долгая операция без БД, финализация с записью в БД (новая короткая сессия). Плюс - не требует переписывания существующего кода. Минус - дополнительные накладные расходы на сериализацию/десериализацию данных в очереди и риск ошибок при передаче больших объёмов.
2. Передача фабрики сессий в репозиторий
Вместо объекта сессии передавайте в конструктор репозитория async_sessionmaker. Тогда каждый метод репозитория создаёт новую сессию, выполняет запрос и закрывает её. Плюс - сессии живут очень коротко. Минус - управление транзакциями (commit/rollback) приходится выносить в методы репозитория, что нарушает чистоту слоя.
3. Передача фабрики сессий в сервис
Сервис получает async_sessionmaker и внутри методов создаёт сессию, а затем передаёт её в конструктор репозитория. Это сохраняет контроль над транзакцией на уровне сервиса и позволяет легко выполнять rollback при ошибках. Минус - репозиторий создаётся внутри метода сервиса, что формально нарушает принцип Dependency Injection.
Рекомендация: комбинированный подход с UnitOfWork
Лучший способ - внедрить паттерн Unit of Work. Он инкапсулирует управление сессией и транзакциями в отдельном компоненте. Пример реализации с Dishka:
class UnitOfWork:
def __init__(self, session_factory: async_sessionmaker[AsyncSession]):
self._session_factory = session_factory
self.session: AsyncSession | None = None
async def __aenter__(self):
self.session = self._session_factory()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type:
await self.session.rollback()
else:
await self.session.commit()
await self.session.close()Теперь сервис получает UnitOfWork и использует его как контекстный менеджер внутри длительной задачи. Сессия создаётся только на время обращения к БД, а транзакция контролируется явно. Dishka может предоставлять UnitOfWork со скоупом REQUEST - для коротких задач, или создавать новый экземпляр прямо в коде задачи.
Как адаптировать Dishka для длительных задач
В вашем провайдере замените предоставление сессии на предоставление фабрики сессий и UnitOfWork:
class ProviderDatabase(Provider):
@provide(scope=Scope.APP)
def provide_async_engine(self, config: DatabaseConfig) -> AsyncEngine:
return create_async_engine(url=config.url, echo=config.sqlalchemy_echo)
@provide(scope=Scope.APP)
def provide_async_session_maker(
self, engine: AsyncEngine
) -> async_sessionmaker[AsyncSession]:
return async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
@provide(scope=Scope.REQUEST)
def provide_uow(self, session_maker: async_sessionmaker[AsyncSession]) -> UnitOfWork:
return UnitOfWork(session_maker)Внутри долгой задачи вы сами управляете временем жизни UnitOfWork:
@broker.task
async def long_task(uow_factory: FromDishka[UnitOfWork]):
# Долгая логика без БД
...
# Короткое обращение к БД
async with uow_factory as uow:
repo = JobRepository(uow.session)
await repo.update_status(...)
# Снова долгая логика
...Таким образом, сессия живёт только внутри блока async with, а DI не нарушается - фабрика UnitOfWork получена через Dishka.
Заключение
Проблема «протухания» сессии в долгих задачах решается не дроблением на микрозадачи и не отказом от DI, а внедрением паттерна UnitOfWork. Dishka отлично подходит для предоставления фабрики сессий и UnitOfWork, а код остаётся чистым и тестируемым. Используйте этот подход в своих проектах на Taskiq, Celery или других очередях - и забудьте об ошибках соединения.