Skip to content

Instantly share code, notes, and snippets.

@NicholasBallard
Created October 15, 2023 15:01
Show Gist options
  • Save NicholasBallard/252e3ad4dcccb1817f2c3c411a8ce36f to your computer and use it in GitHub Desktop.
Save NicholasBallard/252e3ad4dcccb1817f2c3c411a8ce36f to your computer and use it in GitHub Desktop.
SQLAlchemy async database session object
from contextlib import asynccontextmanager, contextmanager
from typing import Generator
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine
from scada.core.config import settings
@contextmanager
def get_db(conn: str = settings.SQLALCHEMY_DATABASE_URI) -> Generator:
"""
Create a database session for the given connection string.
Parameters
----------
conn : str, default=settings.SQLALCHEMY_DATABASE_URI
The connection string to the database.
Yields
------
db : SessionLocal
An instance of the database session.
"""
engine = create_engine(conn, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)
db = SessionLocal()
try:
yield db
except Exception as e:
db.rollback()
raise e
finally:
db.close()
@asynccontextmanager
async def get_db_async(db_url) -> AsyncSession:
"""Get a database connection.
Reference [async design pattern](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.async_sessionmaker) here.
"""
engine: AsyncEngine = create_async_engine(db_url, echo=False)
async_session: async_sessionmaker[AsyncSession] = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
)
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment