Skip to content

Instantly share code, notes, and snippets.

@mmerickel
Last active November 20, 2019 00:38
Show Gist options
  • Save mmerickel/61cd318751a668c32a6f7c59bd36e074 to your computer and use it in GitHub Desktop.
Save mmerickel/61cd318751a668c32a6f7c59bd36e074 to your computer and use it in GitHub Desktop.
wrap a wired service layer into pyramid_services
from myapp.services import make_service_factory
from myapp.utils.tm import tm_context
class CLI:
@reify
def service_factory(self):
return make_service_factory(self.settings)
@contextmanager
def services_context(self):
services = self.service_factory.create_container()
tm = services.get(name='tm')
with tm_context(tm):
yield services
from pyramid_services import NewServiceContainer
from wired import ServiceRegistry
def make_service_factory(settings):
services = ServiceRegistry()
services.register_singleton(settings, name='settings')
engine = get_engine(settings)
services.register_singleton(engine, name='dbengine')
dbmaker = get_session_factory(engine)
services.register_singleton(dbmaker, name='dbmaker')
def tm_factory(services):
return transaction.TransactionManager(explicit=True)
services.register_factory(tm_factory, name='tm')
def db_factory(services):
tm = services.get(name='tm')
dbmaker = services.get(name='dbmaker')
return get_tm_session(dbmaker, transaction_manager=tm)
services.register_factory(db_factory, name='db')
return services
def includeme(config):
settings = config.get_settings()
settings.setdefault('tm.manager_hook', 'pyramid_tm.explicit_manager')
config.include('pyramid_tm')
config.include('pyramid_services')
services = make_service_factory(settings)
config.set_service_registry(services)
def on_new_container(event):
services = event.container
request = event.request
services.register_singleton(request.tm, name='tm')
config.add_subscriber(on_new_container, NewServiceContainer)
from contextlib import contextmanager, suppress
from transaction.interfaces import NoTransaction
from myapp.utils.retry import mark_error_retryable
class DoomedAbort(Exception):
pass
@contextmanager
def tm_context(tm):
tm.begin()
try:
yield tm
with suppress(NoTransaction):
if tm.isDoomed():
raise DoomedAbort
tm.commit()
except DoomedAbort:
with suppress(NoTransaction):
tm.abort()
except BaseException as ex:
with suppress(NoTransaction):
txn = tm.get()
if txn.isRetryableError(ex):
mark_error_retryable(ex)
tm.abort()
raise ex
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment