Last active
October 1, 2018 16:09
-
-
Save jasonwalkeryung/f8a8fe4ea0c29a9c02b3330f76fadc2f to your computer and use it in GitHub Desktop.
Testing with rollback and multiple dbs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Unit test fixture for testing with read replicas as described in: | |
https://gist.github.com/jasonwalkeryung/5133383d66782461cdc3b4607ae35d98 | |
""" | |
import pytest | |
from sqlalchemy_replica import db | |
@pytest.fixture | |
def rollback(): | |
"""Starts transactions for all binds, and rolls back after test is done.""" | |
connections = {} | |
transactions = {} | |
sessions = {} | |
def connect_engine(_engine: Engine, retries: int = 0): | |
try: | |
return _engine.connect() | |
except OperationalError: | |
statsd.increment("config.test.connection_error") | |
if retries < MAX_CONNECTION_RETRIES: | |
time.sleep(0.5) | |
return connect_engine(_engine, retries + 1) | |
else: | |
raise | |
# Start a master transaction for each connection. | |
# Notice we don't connect per engine; each read-only connection has to share the same connection | |
# with its read-write counterpart. Without this, it won't see the commits for our fixture data | |
# and any of the other modifications that happen during the test. | |
all_engines = dict() # type: Dict[DBInstance, Engine] | |
all_engines.update(db.engines) | |
all_engines.update(db.read_only_engines) | |
for bind, engine in all_engines.items(): | |
# Start the master transaction on this connection (if we haven't already) | |
engines = engine if isinstance(engine, list) else [engine] | |
for eng in engines: | |
if eng.url not in connections: | |
connections[eng.url] = connect_engine(eng) | |
transactions[eng.url] = connections[eng.url].begin() | |
# Hijack the session factories to attach to this transaction | |
db.session_factories[bind] = sessionmaker( | |
bind=connections[eng.url], expire_on_commit=False | |
) | |
db.scoped_sessions[bind] = scoped_session(db.session_factories[bind]) | |
db.read_only_scoped_sessions[bind] = [db.scoped_sessions[bind]] | |
# Do the test | |
try: | |
yield testapp | |
finally: | |
# Take back everything we've done | |
for transaction in transactions.values(): | |
transaction.rollback() | |
for session in sessions.values(): | |
session.close() | |
for connection in connections.values(): | |
connection.close() | |
# http://docs.sqlalchemy.org/en/latest/core/connections.html#engine-disposal | |
for _, engine in db.engines.items(): | |
engine.dispose() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment