Skip to content

Instantly share code, notes, and snippets.

@groner
Last active September 21, 2022 18:33
Show Gist options
  • Save groner/8bee5b88dea060eaafb0402208771e65 to your computer and use it in GitHub Desktop.
Save groner/8bee5b88dea060eaafb0402208771e65 to your computer and use it in GitHub Desktop.
zope.sqlalchemy/sqlalchemy 1.4/pysqlite failed commit issue
sqlalchemy>=1.4,<1.5
zope.sqlalchemy>=1.6,<1.7
transaction>=3.0,<3.1
import io
import itertools
import re
from textwrap import dedent
import sqlalchemy as sa
import sqlalchemy.orm as orm
import transaction
import zope.sqlalchemy as zsql
def test():
for txf, dbf, errorf in itertools.product(
[ # transaction controllers
test_with_session_begin,
test_with_implicit_transaction_manager,
test_with_explicit_transaction_manager],
[ # database connection factories
default_lazy_sqlite_transaction_db,
explicit_sqlite_transaction_db,
# Set the hardcoded DSN in the postgresql connection factory if
# you uncomment this.
#postgresql_db,
],
[ # error provoking functions
# The immediate error function doesn't show any unexpected
# behavior, but it's useful to contrast that with the deferred
# error function.
#immediate_error,
deferred_error,
]):
tracefn = f'test.{txf.__name__}.{dbf.__name__}.{errorf.__name__}.log'
with open(tracefn, 'w') as tracefh:
trace = lambda *a: print(*a, file=tracefh)
traced_dbf = lambda: dbf(tracefh=tracefh)
err = txf(traced_dbf, errorf)
if err is not None:
print('not ok', txf.__name__, dbf.__name__, errorf.__name__, err.__class__.__name__)
trace(f'-- exception: {err.__class__.__name__}')
if str(err):
trace(re.sub(r'^', '-- ', str(err), flags=re.MULTILINE))
else:
print('ok', txf.__name__, dbf.__name__, errorf.__name__)
def setup(db):
db.execute(dedent('''\
DROP TABLE IF EXISTS bar
'''))
db.execute(dedent('''\
DROP TABLE IF EXISTS foo
'''))
db.execute(dedent('''\
CREATE TABLE foo (
id integer primary key,
data text)
'''))
db.execute(dedent('''\
CREATE TABLE bar (
id integer primary key,
deferred_foo_id integer
REFERENCES foo(id)
DEFERRABLE INITIALLY DEFERRED,
immediate_foo_id integer
REFERENCES foo(id)
DEFERRABLE INITIALLY IMMEDIATE,
data text)
'''))
def immediate_error(db):
'''Provoke an immediate IntegrityError.'''
db.execute(dedent('''\
INSERT INTO bar(immediate_foo_id,data) VALUES(33,'mouse')
'''))
def deferred_error(db):
'''Provoke a deferred IntegrityError.'''
db.execute(dedent('''\
INSERT INTO bar(deferred_foo_id,data) VALUES(33,'mouse')
'''))
def benign_query(db):
'''A benign query that requires a valid database connection.'''
db.execute(dedent('''\
SELECT count(*) FROM bar
'''))
def default_lazy_sqlite_transaction_db(tracefh=None):
'''Create a regular sqlite:/// in memory database.'''
trace = lambda *a: print(*a, file=tracefh)
engine = sa.create_engine(
'sqlite:///',
echo=False,
)
@sa.event.listens_for(engine, 'connect')
def onconnect(dbapi_connection, connection_record):
if tracefh is not None:
dbapi_connection.set_trace_callback(trace)
dbapi_connection.execute('PRAGMA foreign_keys = ON')
if tracefh is not None:
observe_db_transaction_events(engine, trace)
Session = orm.sessionmaker(engine)
db = Session()
return db
def explicit_sqlite_transaction_db(tracefh=None):
'''Create a sqlite:/// in memory database with explicit transaction control.
Work around for lazy transaction start behavior is described here.
https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#pysqlite-serializable
'''
trace = lambda *a: print(*a, file=tracefh)
engine = sa.create_engine(
'sqlite:///',
echo=False,
isolation_level=None,
)
@sa.event.listens_for(engine, 'connect')
def onconnect(dbapi_connection, connection_record):
if tracefh is not None:
dbapi_connection.set_trace_callback(trace)
dbapi_connection.execute('PRAGMA foreign_keys = ON')
@sa.event.listens_for(engine, 'begin')
def onbegin(conn):
conn.connection.connection.execute('BEGIN -- ACTUAL')
if tracefh is not None:
observe_db_transaction_events(engine, trace)
Session = orm.sessionmaker(engine)
db = Session()
return db
def postgresql_db(tracefh=None):
'''Create a regular postgresql database connection.'''
trace = lambda *a: print(*a, file=tracefh)
engine = sa.create_engine(
'postgresql:///test-deferred-commit-error',
echo=False,
)
@sa.event.listens_for(engine, 'do_connect')
def doconnect(dialect, connection_record, args, kwargs):
connection = dialect.dbapi.connect(
*args,
**kwargs,
connection_factory=dialect.dbapi.extras.LoggingConnection,
)
if tracefh is not None:
connection.initialize(tracefh)
return connection
if tracefh is not None:
observe_db_transaction_events(engine, trace)
Session = orm.sessionmaker(engine)
db = Session()
return db
def observe_db_transaction_events(engine, trace):
@sa.event.listens_for(engine, 'begin')
def onbegin(conn):
trace('-- BEGIN event')
@sa.event.listens_for(engine, 'commit')
def oncommit(conn):
trace('-- COMMIT event')
@sa.event.listens_for(engine, 'rollback')
def onrollback(conn):
trace('-- ROLLBACK event')
# These test runners receive a database connection factory and an error
# provoking function. They run in separate transactions, the following steps:
# - setup database
# - apply error provoking function
# - apply benign query function
# If the benign query function fails, the resulting exception is returned.
if sa.__version__.startswith('1.4.'):
def test_with_session_begin(dbf, errorf):
'''Run test of dbf and errorf with session.begin().'''
db = dbf()
with db.begin():
setup(db)
try:
with db.begin():
errorf(db)
except sa.exc.IntegrityError:
pass
else:
raise AssertionError('no kaboom')
try:
with db.begin():
benign_query(db)
except Exception as e:
return e
else:
def test_with_session_begin(dbf, errorf):
'''Run test of dbf and errorf with session.begin().'''
db = dbf()
with db.transaction:
setup(db)
try:
with db.transaction:
errorf(db)
except sa.exc.IntegrityError:
pass
else:
raise AssertionError('no kaboom')
try:
with db.transaction:
benign_query(db)
except Exception as e:
return e
def test_with_implicit_transaction_manager(dbf, errorf):
'''Run test of dbf and errorf with transaction.manager in implicit mode.'''
db = dbf()
tm = transaction.TransactionManager(explicit=False)
zsatx = zsql.register(db, transaction_manager=tm)
with tm:
setup(db)
zsatx.mark_changed(db)
try:
with tm:
errorf(db)
zsatx.mark_changed(db)
except sa.exc.IntegrityError:
pass
else:
raise AssertionError('no kaboom')
try:
with tm:
benign_query(db)
except Exception as e:
return e
def test_with_explicit_transaction_manager(dbf, errorf):
'''Run test of dbf and errorf with transaction.manager in explicit mode.'''
db = dbf()
tm = transaction.TransactionManager(explicit=True)
zsatx = zsql.register(db, transaction_manager=tm)
with tm:
setup(db)
zsatx.mark_changed(db)
try:
with tm:
errorf(db)
zsatx.mark_changed(db)
except sa.exc.IntegrityError:
pass
else:
raise AssertionError('no kaboom')
try:
with tm as t:
benign_query(db)
except Exception as e:
return e
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment