Last active
April 4, 2024 21:50
-
-
Save coffeewasmyidea/29dfe95b23397bc2a5658e2e94a0e846 to your computer and use it in GitHub Desktop.
SQLAlchemy 2.0 Database Session Manager (async_sessionmaker, AsyncGenerator[Any, AsyncSession])
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import json | |
from collections.abc import AsyncGenerator | |
from contextlib import asynccontextmanager | |
from typing import Any | |
from crpaylib.infrastructure.logger import get_logger | |
from sqlalchemy.exc import SQLAlchemyError | |
from sqlalchemy.ext.asyncio import AsyncEngine | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from sqlalchemy.ext.asyncio import async_sessionmaker | |
from sqlalchemy.ext.asyncio import create_async_engine | |
from application.settings import settings | |
logger = get_logger() | |
class DatabaseSessionManager: | |
""" | |
Dependency Injection | |
class EntityDao: | |
session: AsyncSession | |
def __init__(self, session: AsyncSession = Depends(sessionmanager.session)) -> None: | |
self.session = session | |
# ... | |
Standalone Scripts | |
async with sessionmanager.sessionmaker() as session: | |
dao = EntityDao(session) | |
# ... | |
""" | |
def __init__(self) -> None: | |
self.engine: AsyncEngine | None = None | |
self._sessionmaker: async_sessionmaker | None = None | |
def init(self, app_name: str) -> None: | |
self.engine: AsyncEngine | None = create_async_engine( | |
settings.db_dsn, | |
echo=settings.db_echo, | |
pool_size=settings.db_pool_size, | |
max_overflow=settings.db_max_overflow, | |
json_serializer=functools.partial(json.dumps, default=str), | |
connect_args={"server_settings": {"application_name": app_name}}, | |
) | |
self._sessionmaker = async_sessionmaker(bind=self.engine, expire_on_commit=False) | |
async def session(self) -> AsyncGenerator[Any, AsyncSession]: | |
if self._sessionmaker is None: | |
raise Exception("DatabaseSessionManager is not initialized.") | |
async with self._sessionmaker() as session: | |
yield session | |
@asynccontextmanager | |
async def sessionmaker(self) -> AsyncGenerator[AsyncSession, Any]: | |
if self._sessionmaker is None: | |
raise Exception("DatabaseSessionManager is not initialized.") | |
async with self._sessionmaker() as session: | |
try: | |
yield session | |
except SQLAlchemyError as err: | |
await session.rollback() | |
logger.error("Session rollback because of exception: {}", err) | |
async def close(self) -> None: | |
if self.engine is None: | |
raise Exception("DatabaseSessionManager is not initialized.") | |
await self.engine.dispose() | |
self.engine = None | |
self._sessionmaker = None | |
sessionmanager = DatabaseSessionManager() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment