Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Async PostgreSQL with FastAPI dependency injection & SQLAlchemy
import os
from abc import ABC, abstractmethod
from typing import AsyncIterator, Optional
import uvicorn
from dotenv import load_dotenv
from fastapi import Depends, FastAPI
from fastapi.responses import JSONResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
# not included in example code
from app.models import Folder
# config.py
load_dotenv()
class Config(ABC):
POSTGRES_USERNAME = "postgres"
POSTGRES_DB_NAME = "postgres"
# localhost for development purposes
POSTGRES_HOST = "localhost"
POSTGRES_PORT = "5432"
# password stored in .env file
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
SQL_COMMAND_ECHO = False
class DevelopmentConfig(Config):
SQL_COMMAND_ECHO = True
class ProductionConfig(Config):
# hostname in Docker network for production
POSTGRES_HOST = "db"
def get_config() -> Config:
env = os.getenv("ENV")
if env == "development":
return DevelopmentConfig()
return ProductionConfig()
config = get_config()
# database/base.py
class Database(ABC):
def __init__(self):
self.async_sessionmaker: Optional[sessionmaker] = None
async def __call__(self) -> AsyncIterator[AsyncSession]:
"""For use with FastAPI Depends"""
if not self.async_sessionmaker:
raise ValueError("async_sessionmaker not available. Run setup() first.")
async with self.async_sessionmaker() as session:
yield session
@abstractmethod
def setup(self) -> None:
...
# database/postgres.py
def get_connection_string(driver: str = "asyncpg") -> str:
return f"postgresql+{driver}://{config.POSTGRES_USERNAME}:{config.POSTGRES_PASSWORD}@{config.POSTGRES_HOST}:{config.POSTGRES_PORT}/{config.POSTGRES_DB_NAME}"
class PostgresDatabase(Database):
def setup(self) -> None:
async_engine = create_async_engine(
get_connection_string(),
echo=config.SQL_COMMAND_ECHO,
)
self.async_sessionmaker = sessionmaker(async_engine, class_=AsyncSession)
# depends.py
db = PostgresDatabase()
# main.py
fast_api = FastAPI()
@fast_api.on_event("startup")
async def setup_db() -> None:
db.setup()
# routes.py
@fast_api.get("/example/")
async def db_query_example(
session: AsyncSession = Depends(db),
) -> JSONResponse:
results = await session.execute(select(Folder))
return results.all()
# run_uvicorn.py
if __name__ == "__main__":
uvicorn.run(
"fastapi_postgres_async_example:fast_api",
)
@dan-osull
Copy link
Author

dan-osull commented Oct 28, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment