Skip to content

Instantly share code, notes, and snippets.

@coffeewasmyidea
Last active April 4, 2024 21:50
Show Gist options
  • Save coffeewasmyidea/29dfe95b23397bc2a5658e2e94a0e846 to your computer and use it in GitHub Desktop.
Save coffeewasmyidea/29dfe95b23397bc2a5658e2e94a0e846 to your computer and use it in GitHub Desktop.
SQLAlchemy 2.0 Database Session Manager (async_sessionmaker, AsyncGenerator[Any, AsyncSession])
import functools
import json
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any
from crpaylib.infrastructure.logger import get_logger
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
from application.settings import settings
logger = get_logger()
class DatabaseSessionManager:
"""
Dependency Injection
class EntityDao:
session: AsyncSession
def __init__(self, session: AsyncSession = Depends(sessionmanager.session)) -> None:
self.session = session
# ...
Standalone Scripts
async with sessionmanager.sessionmaker() as session:
dao = EntityDao(session)
# ...
"""
def __init__(self) -> None:
self.engine: AsyncEngine | None = None
self._sessionmaker: async_sessionmaker | None = None
def init(self, app_name: str) -> None:
self.engine: AsyncEngine | None = create_async_engine(
settings.db_dsn,
echo=settings.db_echo,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
json_serializer=functools.partial(json.dumps, default=str),
connect_args={"server_settings": {"application_name": app_name}},
)
self._sessionmaker = async_sessionmaker(bind=self.engine, expire_on_commit=False)
async def session(self) -> AsyncGenerator[Any, AsyncSession]:
if self._sessionmaker is None:
raise Exception("DatabaseSessionManager is not initialized.")
async with self._sessionmaker() as session:
yield session
@asynccontextmanager
async def sessionmaker(self) -> AsyncGenerator[AsyncSession, Any]:
if self._sessionmaker is None:
raise Exception("DatabaseSessionManager is not initialized.")
async with self._sessionmaker() as session:
try:
yield session
except SQLAlchemyError as err:
await session.rollback()
logger.error("Session rollback because of exception: {}", err)
async def close(self) -> None:
if self.engine is None:
raise Exception("DatabaseSessionManager is not initialized.")
await self.engine.dispose()
self.engine = None
self._sessionmaker = None
sessionmanager = DatabaseSessionManager()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment