Skip to content

Instantly share code, notes, and snippets.

@mattlong
Forked from carljm/db.py
Created February 27, 2016 03:27
Show Gist options
  • Save mattlong/e880f9df667b1fa7de6b to your computer and use it in GitHub Desktop.
Save mattlong/e880f9df667b1fa7de6b to your computer and use it in GitHub Desktop.
SQLAlchemy and Postgres autocommit
"""
SQLAlchemy, PostgreSQL (psycopg2), and autocommit
See blog post: http://oddbird.net/2014/06/14/sqlalchemy-postgres-autocommit/
"""
from contextlib import contextmanager
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, Session as BaseSession
class Session(BaseSession):
def __init__(self, *a, **kw):
super(Session, self).__init__(*a, **kw)
self._in_atomic = False
@contextmanager
def atomic(self):
"""Transaction context manager.
Will commit the transaction on successful completion of the block, or
roll it back on error.
Supports nested usage (via savepoints).
"""
nested = self._in_atomic
self.begin(nested=nested)
self._in_atomic = True
try:
yield
except:
self.rollback()
raise
else:
self.commit()
finally:
if not nested:
self._in_atomic = False
class Database(object):
def __init__(self, db_uri):
self.engine = create_engine(db_uri, isolation_level="AUTOCOMMIT")
self.Session = sessionmaker(bind=self.engine, class_=Session, autocommit=True)
# Keep track of which DBAPI connection(s) had autocommit turned off for
# a particular transaction object.
dconns_by_trans = {}
@event.listens_for(self.Session, 'after_begin')
def receive_after_begin(session, transaction, connection):
"""When a (non-nested) transaction begins, turn autocommit off."""
dbapi_connection = connection.connection.connection
if transaction.nested:
assert not dbapi_connection.autocommit
return
assert dbapi_connection.autocommit
dbapi_connection.autocommit = False
dconns_by_trans.setdefault(transaction, set()).add(
dbapi_connection)
@event.listens_for(self.Session, 'after_transaction_end')
def receive_after_transaction_end(session, transaction):
"""Restore autocommit anywhere this transaction turned it off."""
if transaction in dconns_by_trans:
for dbapi_connection in dconns_by_trans[transaction]:
assert not dbapi_connection.autocommit
dbapi_connection.autocommit = True
del dconns_by_trans[transaction]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment