-
-
Save groner/8bee5b88dea060eaafb0402208771e65 to your computer and use it in GitHub Desktop.
zope.sqlalchemy/sqlalchemy 1.4/pysqlite failed commit issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sqlalchemy>=1.4,<1.5 | |
zope.sqlalchemy>=1.6,<1.7 | |
transaction>=3.0,<3.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import itertools | |
import re | |
from textwrap import dedent | |
import sqlalchemy as sa | |
import sqlalchemy.orm as orm | |
import transaction | |
import zope.sqlalchemy as zsql | |
def test(): | |
for txf, dbf, errorf in itertools.product( | |
[ # transaction controllers | |
test_with_session_begin, | |
test_with_implicit_transaction_manager, | |
test_with_explicit_transaction_manager], | |
[ # database connection factories | |
default_lazy_sqlite_transaction_db, | |
explicit_sqlite_transaction_db, | |
# Set the hardcoded DSN in the postgresql connection factory if | |
# you uncomment this. | |
#postgresql_db, | |
], | |
[ # error provoking functions | |
# The immediate error function doesn't show any unexpected | |
# behavior, but it's useful to contrast that with the deferred | |
# error function. | |
#immediate_error, | |
deferred_error, | |
]): | |
tracefn = f'test.{txf.__name__}.{dbf.__name__}.{errorf.__name__}.log' | |
with open(tracefn, 'w') as tracefh: | |
trace = lambda *a: print(*a, file=tracefh) | |
traced_dbf = lambda: dbf(tracefh=tracefh) | |
err = txf(traced_dbf, errorf) | |
if err is not None: | |
print('not ok', txf.__name__, dbf.__name__, errorf.__name__, err.__class__.__name__) | |
trace(f'-- exception: {err.__class__.__name__}') | |
if str(err): | |
trace(re.sub(r'^', '-- ', str(err), flags=re.MULTILINE)) | |
else: | |
print('ok', txf.__name__, dbf.__name__, errorf.__name__) | |
def setup(db): | |
db.execute(dedent('''\ | |
DROP TABLE IF EXISTS bar | |
''')) | |
db.execute(dedent('''\ | |
DROP TABLE IF EXISTS foo | |
''')) | |
db.execute(dedent('''\ | |
CREATE TABLE foo ( | |
id integer primary key, | |
data text) | |
''')) | |
db.execute(dedent('''\ | |
CREATE TABLE bar ( | |
id integer primary key, | |
deferred_foo_id integer | |
REFERENCES foo(id) | |
DEFERRABLE INITIALLY DEFERRED, | |
immediate_foo_id integer | |
REFERENCES foo(id) | |
DEFERRABLE INITIALLY IMMEDIATE, | |
data text) | |
''')) | |
def immediate_error(db): | |
'''Provoke an immediate IntegrityError.''' | |
db.execute(dedent('''\ | |
INSERT INTO bar(immediate_foo_id,data) VALUES(33,'mouse') | |
''')) | |
def deferred_error(db): | |
'''Provoke a deferred IntegrityError.''' | |
db.execute(dedent('''\ | |
INSERT INTO bar(deferred_foo_id,data) VALUES(33,'mouse') | |
''')) | |
def benign_query(db): | |
'''A benign query that requires a valid database connection.''' | |
db.execute(dedent('''\ | |
SELECT count(*) FROM bar | |
''')) | |
def default_lazy_sqlite_transaction_db(tracefh=None): | |
'''Create a regular sqlite:/// in memory database.''' | |
trace = lambda *a: print(*a, file=tracefh) | |
engine = sa.create_engine( | |
'sqlite:///', | |
echo=False, | |
) | |
@sa.event.listens_for(engine, 'connect') | |
def onconnect(dbapi_connection, connection_record): | |
if tracefh is not None: | |
dbapi_connection.set_trace_callback(trace) | |
dbapi_connection.execute('PRAGMA foreign_keys = ON') | |
if tracefh is not None: | |
observe_db_transaction_events(engine, trace) | |
Session = orm.sessionmaker(engine) | |
db = Session() | |
return db | |
def explicit_sqlite_transaction_db(tracefh=None): | |
'''Create a sqlite:/// in memory database with explicit transaction control. | |
Work around for lazy transaction start behavior is described here. | |
https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#pysqlite-serializable | |
''' | |
trace = lambda *a: print(*a, file=tracefh) | |
engine = sa.create_engine( | |
'sqlite:///', | |
echo=False, | |
isolation_level=None, | |
) | |
@sa.event.listens_for(engine, 'connect') | |
def onconnect(dbapi_connection, connection_record): | |
if tracefh is not None: | |
dbapi_connection.set_trace_callback(trace) | |
dbapi_connection.execute('PRAGMA foreign_keys = ON') | |
@sa.event.listens_for(engine, 'begin') | |
def onbegin(conn): | |
conn.connection.connection.execute('BEGIN -- ACTUAL') | |
if tracefh is not None: | |
observe_db_transaction_events(engine, trace) | |
Session = orm.sessionmaker(engine) | |
db = Session() | |
return db | |
def postgresql_db(tracefh=None): | |
'''Create a regular postgresql database connection.''' | |
trace = lambda *a: print(*a, file=tracefh) | |
engine = sa.create_engine( | |
'postgresql:///test-deferred-commit-error', | |
echo=False, | |
) | |
@sa.event.listens_for(engine, 'do_connect') | |
def doconnect(dialect, connection_record, args, kwargs): | |
connection = dialect.dbapi.connect( | |
*args, | |
**kwargs, | |
connection_factory=dialect.dbapi.extras.LoggingConnection, | |
) | |
if tracefh is not None: | |
connection.initialize(tracefh) | |
return connection | |
if tracefh is not None: | |
observe_db_transaction_events(engine, trace) | |
Session = orm.sessionmaker(engine) | |
db = Session() | |
return db | |
def observe_db_transaction_events(engine, trace): | |
@sa.event.listens_for(engine, 'begin') | |
def onbegin(conn): | |
trace('-- BEGIN event') | |
@sa.event.listens_for(engine, 'commit') | |
def oncommit(conn): | |
trace('-- COMMIT event') | |
@sa.event.listens_for(engine, 'rollback') | |
def onrollback(conn): | |
trace('-- ROLLBACK event') | |
# These test runners receive a database connection factory and an error | |
# provoking function. They run in separate transactions, the following steps: | |
# - setup database | |
# - apply error provoking function | |
# - apply benign query function | |
# If the benign query function fails, the resulting exception is returned. | |
if sa.__version__.startswith('1.4.'): | |
def test_with_session_begin(dbf, errorf): | |
'''Run test of dbf and errorf with session.begin().''' | |
db = dbf() | |
with db.begin(): | |
setup(db) | |
try: | |
with db.begin(): | |
errorf(db) | |
except sa.exc.IntegrityError: | |
pass | |
else: | |
raise AssertionError('no kaboom') | |
try: | |
with db.begin(): | |
benign_query(db) | |
except Exception as e: | |
return e | |
else: | |
def test_with_session_begin(dbf, errorf): | |
'''Run test of dbf and errorf with session.begin().''' | |
db = dbf() | |
with db.transaction: | |
setup(db) | |
try: | |
with db.transaction: | |
errorf(db) | |
except sa.exc.IntegrityError: | |
pass | |
else: | |
raise AssertionError('no kaboom') | |
try: | |
with db.transaction: | |
benign_query(db) | |
except Exception as e: | |
return e | |
def test_with_implicit_transaction_manager(dbf, errorf): | |
'''Run test of dbf and errorf with transaction.manager in implicit mode.''' | |
db = dbf() | |
tm = transaction.TransactionManager(explicit=False) | |
zsatx = zsql.register(db, transaction_manager=tm) | |
with tm: | |
setup(db) | |
zsatx.mark_changed(db) | |
try: | |
with tm: | |
errorf(db) | |
zsatx.mark_changed(db) | |
except sa.exc.IntegrityError: | |
pass | |
else: | |
raise AssertionError('no kaboom') | |
try: | |
with tm: | |
benign_query(db) | |
except Exception as e: | |
return e | |
def test_with_explicit_transaction_manager(dbf, errorf): | |
'''Run test of dbf and errorf with transaction.manager in explicit mode.''' | |
db = dbf() | |
tm = transaction.TransactionManager(explicit=True) | |
zsatx = zsql.register(db, transaction_manager=tm) | |
with tm: | |
setup(db) | |
zsatx.mark_changed(db) | |
try: | |
with tm: | |
errorf(db) | |
zsatx.mark_changed(db) | |
except sa.exc.IntegrityError: | |
pass | |
else: | |
raise AssertionError('no kaboom') | |
try: | |
with tm as t: | |
benign_query(db) | |
except Exception as e: | |
return e | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment