Skip to content

Instantly share code, notes, and snippets.

@squeaky-pl
Created August 17, 2016 11:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save squeaky-pl/e8f0dbc5c2209af68d7d67ea1aab4aa1 to your computer and use it in GitHub Desktop.
Save squeaky-pl/e8f0dbc5c2209af68d7d67ea1aab4aa1 to your computer and use it in GitHub Desktop.
"""SQLAlchemy session integration for asyncio.
This is hazardous and fragile material. Please don't make changes here unless
you really know what you are doing. You need to understand how threads work
and why SQLAlchemy does not work with asyncio out of the box.
For the reference consult following articles:
http://docs.sqlalchemy.org/en/latest/orm/contextual.html#thread-local-scope
https://docs.python.org/3/library/asyncio-eventloop.html#executor
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
http://techspot.zzzeek.org/2015/02/15/asynchronous-python-and-databases
"""
import asyncio
import concurrent.futures
from functools import partial
from sqlalchemy.orm import scoped_session, sessionmaker
# this is a thread-scoped session, i.e. each thread gets
# its own, private session.
session = None
loop = asyncio.get_event_loop()
def install(engine):
global session
session = scoped_session(sessionmaker(bind=engine))
async def execute(executor, callback, *args):
return await loop.run_in_executor(executor, callback, session, *args)
async def middleware(app, handler):
"""Provide execute_db on request object, finalize transactions.
By default commit transaction, in case of exception rollback,
finally dispose the thread-local session.
"""
assert session, "You need to call install before using this middleware."
async def attach_execute_db(request):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
request.execute_db = partial(execute, executor)
try:
result = await handler(request)
except:
await loop.run_in_executor(executor, session.rollback)
raise
else:
await loop.run_in_executor(executor, session.commit)
finally:
await loop.run_in_executor(executor, session.remove)
return result
return attach_execute_db
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment