Created
August 17, 2016 11:16
-
-
Save squeaky-pl/e8f0dbc5c2209af68d7d67ea1aab4aa1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""SQLAlchemy session integration for asyncio. | |
This is hazardous and fragile material. Please don't make changes here unless | |
you really know what you are doing. You need to understand how threads work | |
and why SQLAlchemy does not work with asyncio out of the box. | |
For the reference consult following articles: | |
http://docs.sqlalchemy.org/en/latest/orm/contextual.html#thread-local-scope | |
https://docs.python.org/3/library/asyncio-eventloop.html#executor | |
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor | |
http://techspot.zzzeek.org/2015/02/15/asynchronous-python-and-databases | |
""" | |
import asyncio | |
import concurrent.futures | |
from functools import partial | |
from sqlalchemy.orm import scoped_session, sessionmaker | |
# this is a thread-scoped session, i.e. each thread gets | |
# its own, private session. | |
session = None | |
loop = asyncio.get_event_loop() | |
def install(engine): | |
global session | |
session = scoped_session(sessionmaker(bind=engine)) | |
async def execute(executor, callback, *args): | |
return await loop.run_in_executor(executor, callback, session, *args) | |
async def middleware(app, handler): | |
"""Provide execute_db on request object, finalize transactions. | |
By default commit transaction, in case of exception rollback, | |
finally dispose the thread-local session. | |
""" | |
assert session, "You need to call install before using this middleware." | |
async def attach_execute_db(request): | |
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: | |
request.execute_db = partial(execute, executor) | |
try: | |
result = await handler(request) | |
except: | |
await loop.run_in_executor(executor, session.rollback) | |
raise | |
else: | |
await loop.run_in_executor(executor, session.commit) | |
finally: | |
await loop.run_in_executor(executor, session.remove) | |
return result | |
return attach_execute_db |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment