Skip to content

Instantly share code, notes, and snippets.

@tkeith
Created September 22, 2015 17:25
Show Gist options
  • Save tkeith/3251bed3b407ff71abd4 to your computer and use it in GitHub Desktop.
Save tkeith/3251bed3b407ff71abd4 to your computer and use it in GitHub Desktop.
SQLAlchemy Transaction Management
dbe = sqlalchemy.create_engine(config.DB_URL, isolation_level='SERIALIZABLE')
dbs = scoped_session(sessionmaker(bind=dbe, autoflush=True, autocommit=False))
in_dbt = False
tdata = dict()
in_retry = False
@contextmanager
def dbt_cm(save=True, top=False, _retry=False):
global in_dbt
global in_nested_dbt
global tdata
global in_retry
prev_in_dbt = in_dbt
if not in_dbt:
log.debug('entering new database transaction')
in_retry = _retry
in_dbt = True
tdata.clear()
else:
if top:
raise Exception('Not top level transaction')
if in_retry and not _retry:
raise Exception('In transaction that could be retried but retry is not true')
dbs.begin_nested()
in_nested_dbt = True
try:
yield
except Exception:
dbs.rollback()
raise
else:
if save:
try:
dbs.commit()
except Exception:
dbs.rollback()
raise
else:
dbs.rollback()
finally:
in_dbt = prev_in_dbt
def dbt_f(retry=True, cache=False, **dbt_kwargs):
def decr(f):
@functools.wraps(f)
def decd(*f_args, **f_kwargs):
while True:
try:
with dbt_cm(_retry=retry, **dbt_kwargs):
if cache:
key = ('tcached', hash(f), tuple(f_args), tuple(sorted(f_kwargs.items())))
if key in tdata:
return tdata[key]
res = f(*f_args, **f_kwargs)
tdata[key] = res
return res
return f(*f_args, **f_kwargs)
except sqlalchemy.exc.DBAPIError as e:
if not isinstance(e.orig, psycopg2.extensions.TransactionRollbackError):
raise
log.debug('transaction rolled back')
if in_dbt or not retry:
raise
log.debug('retrying transaction')
return decd
return decr
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment