Skip to content

Instantly share code, notes, and snippets.

@jasonwalkeryung
Last active October 1, 2018 16:09
Show Gist options
  • Save jasonwalkeryung/f8a8fe4ea0c29a9c02b3330f76fadc2f to your computer and use it in GitHub Desktop.
Save jasonwalkeryung/f8a8fe4ea0c29a9c02b3330f76fadc2f to your computer and use it in GitHub Desktop.
Testing with rollback and multiple dbs
"""
Unit test fixture for testing with read replicas as described in:
https://gist.github.com/jasonwalkeryung/5133383d66782461cdc3b4607ae35d98
"""
import pytest
from sqlalchemy_replica import db
@pytest.fixture
def rollback():
"""Starts transactions for all binds, and rolls back after test is done."""
connections = {}
transactions = {}
sessions = {}
def connect_engine(_engine: Engine, retries: int = 0):
try:
return _engine.connect()
except OperationalError:
statsd.increment("config.test.connection_error")
if retries < MAX_CONNECTION_RETRIES:
time.sleep(0.5)
return connect_engine(_engine, retries + 1)
else:
raise
# Start a master transaction for each connection.
# Notice we don't connect per engine; each read-only connection has to share the same connection
# with its read-write counterpart. Without this, it won't see the commits for our fixture data
# and any of the other modifications that happen during the test.
all_engines = dict() # type: Dict[DBInstance, Engine]
all_engines.update(db.engines)
all_engines.update(db.read_only_engines)
for bind, engine in all_engines.items():
# Start the master transaction on this connection (if we haven't already)
engines = engine if isinstance(engine, list) else [engine]
for eng in engines:
if eng.url not in connections:
connections[eng.url] = connect_engine(eng)
transactions[eng.url] = connections[eng.url].begin()
# Hijack the session factories to attach to this transaction
db.session_factories[bind] = sessionmaker(
bind=connections[eng.url], expire_on_commit=False
)
db.scoped_sessions[bind] = scoped_session(db.session_factories[bind])
db.read_only_scoped_sessions[bind] = [db.scoped_sessions[bind]]
# Do the test
try:
yield testapp
finally:
# Take back everything we've done
for transaction in transactions.values():
transaction.rollback()
for session in sessions.values():
session.close()
for connection in connections.values():
connection.close()
# http://docs.sqlalchemy.org/en/latest/core/connections.html#engine-disposal
for _, engine in db.engines.items():
engine.dispose()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment