Skip to content

Instantly share code, notes, and snippets.

@mmerickel
Last active November 22, 2018 03:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmerickel/43c136cd24d510ffdcd828ae04d87b72 to your computer and use it in GitHub Desktop.
Save mmerickel/43c136cd24d510ffdcd828ae04d87b72 to your computer and use it in GitHub Desktop.
from alembic import context
from pyramid.paster import setup_logging
from sqlalchemy import create_engine, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if hasattr(config, 'config_file_name'):
path = config.config_file_name
setup_logging(path)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
import myapp.model.meta.base # noqa
target_metadata = myapp.model.meta.base.metadata
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
url = config.get_section('db')['url']
engine = create_engine(url, poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True,
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
raise NotImplementedError
else:
run_migrations_online()
import weakref
from alembic.config import Config
from alembic.migration import MigrationContext
from alembic.script import ScriptDirectory
import sqlalchemy as sa
from sqlalchemy.engine.url import make_url
from sqlalchemy.orm import configure_mappers
from .types import json_deserializer, json_serializer
log = __import__('logging').getLogger(__name__)
engine_cache = weakref.WeakValueDictionary()
def get_engine(
settings,
prefix='db.',
check_version=True,
pessimistic=True,
):
engine = None
prefix = prefix or ''
# pull out options because not all of them can be passed to
# the engine_from_config fn
opts = {
k[len(prefix):]: v
for k, v in settings.items()
if k.startswith(prefix)
}
opts.pop('here', None)
connect_timeout = opts.pop('connect_timeout', None)
lock_timeout = opts.pop('lock_timeout', None)
url = make_url(opts['url'])
query = url.query or {}
if connect_timeout:
query['connect_timeout'] = int(int(connect_timeout) / 1000)
if lock_timeout:
query['options'] = f'-c lock_timeout={lock_timeout}'
url.query = query
opts['url'] = url
# attempt to load the engine from the cache
engine_id = opts.pop('engine_id')
if engine_id:
engine = engine_cache.get(engine_id)
if engine:
log.debug('using cached engine for id=%s', engine_id)
# if no engine was found in the cache, load one
if not engine:
engine = sa.engine_from_config(
opts,
prefix='',
isolation_level='READ COMMITTED',
json_serializer=json_serializer,
json_deserializer=json_deserializer,
pool_pre_ping=pessimistic,
)
# assert the engine is at the correct migration
if check_version:
current_version = get_current_version(engine)
log.info('current migration: %s', current_version)
latest_version = get_latest_version(engine)
log.debug('latest migration: %s', latest_version)
if current_version != latest_version:
raise RuntimeError(
'database versions out of sync, %s != %s' % (
current_version, latest_version))
# for some reason the mappers are not properly configured at this stage
# and some model objects were missing relationships such as
# AppInstall.project_site_links
configure_mappers()
# store engine in cache
if engine_id:
engine_cache[engine_id] = engine
return engine
def get_current_version(engine):
conn = engine.connect()
try:
ctx = MigrationContext.configure(conn)
return ctx.get_current_revision()
finally:
conn.close()
def get_latest_version(engine):
cfg = Config()
cfg.set_main_option('script_location', 'myapp.model:migrations')
script = ScriptDirectory.from_config(cfg)
return script.get_current_head()
from pyramid.interfaces import IRequest
from pyramid_services import NewServiceContainer
import transaction
from wired import ServiceRegistry
from myapp.model.meta.engine import get_engine
from myapp.model.meta.session import get_session_factory
from myapp.model.meta.session import get_tm_session
from myapp.utils.settings import asbool
from .login import LoginService
log = __import__('logging').getLogger(__name__)
def make_service_factory(settings, flags=None):
if flags is None:
flags = {}
def flag(name, default):
return asbool(flags.get(name, default))
services = ServiceRegistry()
services.register_singleton(settings, name='settings')
engine = get_engine(settings, pessimistic=flag('pessimistic_engine', True))
services.register_singleton(engine, name='dbengine')
dbmaker = get_session_factory(engine)
services.register_singleton(dbmaker, name='dbmaker')
def tm_factory(services):
return transaction.TransactionManager(explicit=True)
services.register_factory(tm_factory, name='tm')
def db_factory(services):
tm = services.get(name='tm')
return get_tm_session(dbmaker, transaction_manager=tm)
services.register_factory(db_factory, name='db')
mq_source = load_source_from_settings(settings)
def mq_factory(services):
db = services.get(name='db')
tm = services.get(name='tm')
mq = mq_source.bind(db=db, transaction_manager=tm)
return mq
services.register_factory(mq_factory, name='mq')
def login_factory(services):
db = services.get(name='db')
svc = LoginService(db, settings)
return svc
services.register_factory(login_factory, LoginService)
return services
def includeme(config):
settings = config.get_settings()
settings.setdefault('tm.manager_hook', 'pyramid_tm.explicit_manager')
config.include('pyramid_tm')
config.include('pyramid_retry')
config.include('pyramid_services')
services = make_service_factory(settings, flags={
# on web requests we rely on pyramid_retry to handle connection
# errors with the database but in background scripts we want
# to ping first and get a good connection from the pool because
# there is no retry
'pessimistic_engine': False,
})
config.set_service_registry(services)
def on_new_container(event):
services = event.container
request = event.request
# override the default tm in the container with request.tm
# when servicing requests
services.set(request.tm, name='tm')
config.add_subscriber(on_new_container, NewServiceContainer)
import zope.sqlalchemy
from sqlalchemy.orm import sessionmaker
log = __import__('logging').getLogger(__name__)
def get_session_factory(engine):
maker = sessionmaker()
maker.configure(bind=engine)
return maker
def get_tm_session(session_factory, transaction_manager):
session = session_factory()
zope.sqlalchemy.register(session, transaction_manager=transaction_manager)
return session
from pyramid.view import view_config
from myapp import services as S
@view_config(
route_name='login',
)
def login_view(request):
svc = request.find_service(S.LoginService)
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment