Skip to content

Instantly share code, notes, and snippets.

@mglowinski93
Last active August 21, 2023 09:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mglowinski93/46725668dfed35155825bf05a162d96d to your computer and use it in GitHub Desktop.
Save mglowinski93/46725668dfed35155825bf05a162d96d to your computer and use it in GitHub Desktop.
Asynchronous sqlalchemy with routing to multiple databases
from contextlib import asynccontextmanager
from enum import Enum
from sqlalchemy import (
Column,
Integer,
String,
Insert,
Update,
Delete,
)
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base, Session
class Engines(Enum):
PRIMARY = create_async_engine(
url="<your_connection_string>",
echo=True,
)
SECONDARY = create_async_engine(
url="<your_connection_string>",
echo=True,
)
Base = declarative_base()
class Tutorial(Base):
__tablename__ = "tutorials"
tutorial_id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False)
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kw):
if isinstance(clause, (Insert, Update, Delete)):
return Engines.SECONDARY.value.sync_engine
return Engines.PRIMARY.value.sync_engine
def async_session_generator():
return async_sessionmaker(sync_session_class=RoutingSession)
@asynccontextmanager
async def get_session():
try:
async_session = async_session_generator()
async with async_session() as session:
yield session
except:
await session.rollback()
raise
finally:
await session.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment