Skip to content

Instantly share code, notes, and snippets.

@panki
Created June 11, 2015 12:50
Show Gist options
  • Save panki/14af23612a69fc5842e5 to your computer and use it in GitHub Desktop.
Save panki/14af23612a69fc5842e5 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from contextlib import contextmanager
from functools import wraps
import functools
from payme import types
from . import test
from sqlalchemy import MetaData, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine = None
metadata = MetaData()
Session = sessionmaker()
Model = declarative_base(metadata=metadata)
class Config(types.Struct):
url = types.String
echo = types.Bool
autoflush = types.Bool
autocommit = types.Bool
expire_on_commit = types.Bool
def __init__(self, url='', echo=False, autoflush=True, autocommit=False,
expire_on_commit=False):
super().__init__()
self.url = url
self.echo = echo
self.autoflush = autoflush
self.autocommit = autocommit
self.expire_on_commit = expire_on_commit
def configure(config):
global engine
global metadata
global Session
engine = create_engine(config.url, convert_unicode=True, echo=config.echo)
metadata.bind = engine
Session.configure(
bind=engine,
autocommit=config.autocommit,
autoflush=config.autoflush,
expire_on_commit=config.expire_on_commit)
@contextmanager
def tx_scope(readonly=False):
tx = Tx(session=Session(), readonly=readonly)
try:
with tx:
yield tx
if not readonly:
tx.commit()
except:
tx.rollback()
raise
finally:
tx.close()
def _transactional_wrapper(func, readonly=True):
"""Decorator for declarative transaction demarcation.
Usage::
@atomic
def my_method(tx=None):
user = tx.query(User).get(1)
user.rename('John Doe')
"""
@wraps(func)
def wrapper(*args, **kwargs):
tx = kwargs.get('tx')
if tx:
assert isinstance(tx, Tx)
assert readonly or not tx.readonly
with tx:
result = func(*args, **kwargs)
return result
else:
with tx_scope(readonly=readonly) as tx:
return func(*args, tx=tx, **kwargs)
return wrapper
class Tx:
def __init__(self, session, readonly=False):
self._session = session
self._readonly = readonly
self._depth_counter = 0
@property
def writeable(self):
return not self._readonly
@property
def readonly(self):
return self._readonly
@property
def free(self):
return self._depth_counter == 0
def __enter__(self):
self._depth_counter += 1
return self
def __exit__(self, *exc):
self._depth_counter -= 1
def flush(self):
if self.readonly:
raise Exception('Flush is not enabled on readonly session')
self._session.flush()
def commit(self):
if self.readonly:
raise Exception('Commit is not enabled on readonly session')
self._session.commit()
def rollback(self):
self._session.rollback()
def close(self):
self._session.close()
def add(self, *args, **kwargs):
return self._session.add(*args, **kwargs)
def query(self, *args, **kwargs):
return self._session.query(*args, **kwargs)
def filter(self, *args, **kwargs):
return self._session.filter(*args, **kwargs)
def filter_by(self, *args, **kwargs):
return self._session.filter_by(*args, **kwargs)
def connection(self):
return self._session.connection()
atomic = functools.partial(_transactional_wrapper, readonly=False)
fetch = functools.partial(_transactional_wrapper, readonly=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment