Skip to content

Instantly share code, notes, and snippets.

@jdavcs
Last active November 27, 2023 18:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdavcs/1402a63c8afc9150646e809f3d0ae2d9 to your computer and use it in GitHub Desktop.
Save jdavcs/1402a63c8afc9150646e809f3d0ae2d9 to your computer and use it in GitHub Desktop.
nested transactions demo
import contextlib
import logging
from sqlalchemy import Column, Integer, String, create_engine, select
from sqlalchemy.orm import declarative_base, sessionmaker
#################### setup ####################
engine = create_engine(CONNECTION-STRING)
Base = declarative_base()
class Job(Base):
__tablename__ = "job"
id = Column(Integer, primary_key=True)
state = Column(String)
Base.metadata.create_all(engine)
@contextlib.contextmanager
def transaction(session):
if not session.in_transaction():
with session.begin():
yield
else:
yield
def get_jobs(session):
return session.scalars(select(Job).order_by(Job.id))
def get_session():
return sessionmaker(bind=engine, autoflush=False, autocommit=False)()
#################### tests ####################
"""For all demos: set ERROR_JOB_ID to simulate error at a given step in the iteration."""
def run_original_version():
"""
Expeted result:
- jobs processed before error should be commited
- errored job: rolled back
- subsequent jobs: not reached
"""
with get_session() as session, session.begin():
try:
for job in get_jobs(session):
print(f"processing job {job.id}")
with session.begin_nested():
change_state(session, job, ERROR_JOB_ID)
finally:
session.commit()
def run_17085():
"""
Expeted result:
- jobs processed before error should be commited
- errored job: NOT rolled back
- subsequent jobs: ???
Ref: #17085
"""
with get_session() as session:
for job in get_jobs(session):
print(f"processing job {job.id}")
try:
change_state(session, job, ERROR_JOB_ID)
except Exception:
logging.error("Error while recovering job %s during application startup.", job.id)
with transaction(session):
session.commit()
def run_corrected():
"""
Expeted result:
- jobs processed before error should be commited
- errored job: NOT rolled back
- subsequent jobs: committed
"""
with get_session() as session, session.begin():
try:
for job in get_jobs(session):
print(f"processing job {job.id}")
try:
with session.begin_nested():
change_state(session, job, ERROR_JOB_ID)
except Exception:
logging.error("Error while recovering job %s during application startup.", job.id)
finally:
session.commit()
def change_state(session, job, error_id=None):
with transaction(session):
job.state = "updated"
if job.id == error_id:
raise Exception("MY EXCEPTION") # simulate error
session.commit()
ERROR_JOB_ID = 15
#run_original_version()
#run_17085()
run_corrected()
# state before running
sa20test5=# select * from job order by id;
id | state
----+-------
13 | new
14 | new
15 | new
16 | new
17 | new
(5 rows)
# run original
(.venv_sa14) rivendell$ python demo_nested_transaction_17085.py
2023-11-27 11:39:09,707 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2023-11-27 11:39:09,707 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine select current_schema()
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2023-11-27 11:39:09,710 INFO sqlalchemy.engine.Engine [generated in 0.00008s] {'name': 'job'}
2023-11-27 11:39:09,710 INFO sqlalchemy.engine.Engine COMMIT
2023-11-27 11:39:09,710 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:39:09,711 INFO sqlalchemy.engine.Engine SELECT job.id, job.state
FROM job ORDER BY job.id
2023-11-27 11:39:09,711 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {}
processing job 13
2023-11-27 11:39:09,712 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_1
2023-11-27 11:39:09,712 INFO sqlalchemy.engine.Engine [no key 0.00005s] {}
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [generated in 0.00007s] {'state': 'updated', 'job_id': 13}
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_1
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [no key 0.00005s] {}
processing job 14
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_2
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [no key 0.00004s] {}
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [cached since 0.0008041s ago] {'state': 'updated', 'job_id': 14}
2023-11-27 11:39:09,714 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_2
2023-11-27 11:39:09,714 INFO sqlalchemy.engine.Engine [no key 0.00005s] {}
processing job 15
2023-11-27 11:39:09,714 INFO sqlalchemy.engine.Engine COMMIT
Traceback (most recent call last):
File "/home/jdavcs/0dev/drafts/python/sqlalchemy/sa20/demo_nested_transaction_17085.py", line 108, in <module>
run_original_version()
File "/home/jdavcs/0dev/drafts/python/sqlalchemy/sa20/demo_nested_transaction_17085.py", line 55, in run_original_version
change_state(session, job, ERROR_JOB_ID)
File "/home/jdavcs/0dev/drafts/python/sqlalchemy/sa20/demo_nested_transaction_17085.py", line 103, in change_state
raise Exception("MY EXCEPTION") # simulate error
Exception: MY EXCEPTION
# state after running
sa20test5=# select * from job order by id;
id | state
----+---------
13 | updated
14 | updated
15 | new
16 | new
17 | new
(5 rows)
# RESET STATE TO 'new'
sa20test5=# update job set state = 'new';
UPDATE 5
sa20test5=# select * from job order by id;
id | state
----+-------
13 | new
14 | new
15 | new
16 | new
17 | new
(5 rows)
# THEN RUN #17085
2023-11-27 11:44:43,912 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2023-11-27 11:44:43,913 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine select current_schema()
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine [generated in 0.00007s] {'name': 'job'}
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine COMMIT
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:44:43,916 INFO sqlalchemy.engine.Engine SELECT job.id, job.state
FROM job ORDER BY job.id
2023-11-27 11:44:43,916 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {}
processing job 13
2023-11-27 11:44:43,918 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:44:43,918 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {'state': 'updated', 'job_id': 13}
2023-11-27 11:44:43,918 INFO sqlalchemy.engine.Engine COMMIT
2023-11-27 11:44:43,929 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:44:43,930 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state
FROM job
WHERE job.id = %(pk_1)s
2023-11-27 11:44:43,930 INFO sqlalchemy.engine.Engine [generated in 0.00008s] {'pk_1': 14}
processing job 14
2023-11-27 11:44:43,931 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:44:43,931 INFO sqlalchemy.engine.Engine [cached since 0.01305s ago] {'state': 'updated', 'job_id': 14}
2023-11-27 11:44:43,931 INFO sqlalchemy.engine.Engine COMMIT
2023-11-27 11:44:43,934 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:44:43,934 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state
FROM job
WHERE job.id = %(pk_1)s
2023-11-27 11:44:43,934 INFO sqlalchemy.engine.Engine [cached since 0.003971s ago] {'pk_1': 15}
ERROR:root:Error while recovering job 15 during application startup.
processing job 15
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state
FROM job
WHERE job.id = %(pk_1)s
INFO:sqlalchemy.engine.Engine:SELECT job.id AS job_id, job.state AS job_state
FROM job
WHERE job.id = %(pk_1)s
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine [cached since 0.004532s ago] {'pk_1': 16}
INFO:sqlalchemy.engine.Engine:[cached since 0.004532s ago] {'pk_1': 16}
processing job 16
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine [generated in 0.00006s] ({'state': 'updated', 'job_id': 15}, {'state': 'updated', 'job_id': 16})
INFO:sqlalchemy.engine.Engine:[generated in 0.00006s] ({'state': 'updated', 'job_id': 15}, {'state': 'updated', 'job_id': 16})
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine COMMIT
INFO:sqlalchemy.engine.Engine:COMMIT
2023-11-27 11:44:43,938 INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
2023-11-27 11:44:43,938 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state
FROM job
WHERE job.id = %(pk_1)s
INFO:sqlalchemy.engine.Engine:SELECT job.id AS job_id, job.state AS job_state
FROM job
WHERE job.id = %(pk_1)s
2023-11-27 11:44:43,938 INFO sqlalchemy.engine.Engine [cached since 0.008437s ago] {'pk_1': 17}
INFO:sqlalchemy.engine.Engine:[cached since 0.008437s ago] {'pk_1': 17}
processing job 17
2023-11-27 11:44:43,939 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:44:43,939 INFO sqlalchemy.engine.Engine [cached since 0.02138s ago] {'state': 'updated', 'job_id': 17}
INFO:sqlalchemy.engine.Engine:[cached since 0.02138s ago] {'state': 'updated', 'job_id': 17}
2023-11-27 11:44:43,939 INFO sqlalchemy.engine.Engine COMMIT
INFO:sqlalchemy.engine.Engine:COMMIT
# result
sa20test5=# select * from job order by id;
id | state
----+---------
13 | updated
14 | updated
15 | updated
16 | updated
17 | updated
(5 rows)
# RESET STATE TO 'new'
sa20test5=# update job set state = 'new';
UPDATE 5
sa20test5=# select * from job order by id;
id | state
----+-------
13 | new
14 | new
15 | new
16 | new
17 | new
(5 rows)
# RUN CORRECTED VERSION:
2023-11-27 11:46:52,834 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2023-11-27 11:46:52,834 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine select current_schema()
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine [generated in 0.00007s] {'name': 'job'}
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine COMMIT
2023-11-27 11:46:52,837 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine SELECT job.id, job.state
FROM job ORDER BY job.id
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {}
processing job 13
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_1
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine [no key 0.00005s] {}
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {'state': 'updated', 'job_id': 13}
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_1
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine [no key 0.00004s] {}
processing job 14
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_2
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine [no key 0.00004s] {}
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine [cached since 0.0007629s ago] {'state': 'updated', 'job_id': 14}
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_2
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine [no key 0.00004s] {}
processing job 15
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state
FROM job
WHERE job.id = %(pk_1)s
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {'pk_1': 15}
ERROR:root:Error while recovering job 15 during application startup.
processing job 16
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_3
INFO:sqlalchemy.engine.Engine:SAVEPOINT sa_savepoint_3
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine [no key 0.00005s] {}
INFO:sqlalchemy.engine.Engine:[no key 0.00005s] {}
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine [cached since 0.002325s ago] {'state': 'updated', 'job_id': 16}
INFO:sqlalchemy.engine.Engine:[cached since 0.002325s ago] {'state': 'updated', 'job_id': 16}
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_3
INFO:sqlalchemy.engine.Engine:RELEASE SAVEPOINT sa_savepoint_3
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [no key 0.00004s] {}
INFO:sqlalchemy.engine.Engine:[no key 0.00004s] {}
processing job 17
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_4
INFO:sqlalchemy.engine.Engine:SAVEPOINT sa_savepoint_4
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [no key 0.00004s] {}
INFO:sqlalchemy.engine.Engine:[no key 0.00004s] {}
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [cached since 0.002891s ago] {'state': 'updated', 'job_id': 17}
INFO:sqlalchemy.engine.Engine:[cached since 0.002891s ago] {'state': 'updated', 'job_id': 17}
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_4
INFO:sqlalchemy.engine.Engine:RELEASE SAVEPOINT sa_savepoint_4
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [no key 0.00004s] {}
INFO:sqlalchemy.engine.Engine:[no key 0.00004s] {}
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine COMMIT
INFO:sqlalchemy.engine.Engine:COMMIT
# final state
sa20test5=# select * from job order by id;
id | state
----+---------
13 | updated
14 | updated
15 | new
16 | updated
17 | updated
(5 rows)
@mvdbeek
Copy link

mvdbeek commented Nov 27, 2023

https://gist.github.com/jdavcs/1402a63c8afc9150646e809f3d0ae2d9#file-demo-py-L91 presumes we only ever commit once in whatever is being called inside the with statement. That seems very risky:

>>> with sa_session() as session, session.begin():
...     with session.begin_nested():
...         sa_session.execute(update(Job).where(Job.id == 1).values(state="error"))
...         sa_session.commit()
...         sa_session.execute(update(Job).where(Job.id == 1).values(state="ok"))
...         sa_session.commit()
...
<sqlalchemy.engine.cursor.LegacyCursorResult object at 0x121bc4a90>
Traceback (most recent call last):
  File "<stdin>", line 5, in <module>
  File "<string>", line 2, in execute
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1716, in execute
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
    TransactionalContext._trans_ctx_check(self)
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/util.py", line 199, in _trans_ctx_check
    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.

I don't see the point of the rollback, I don't think it makes sense in this context.

@jdavcs
Copy link
Author

jdavcs commented Nov 27, 2023

OK, I see your point. And yes, we can't commit within the begin block after we have committed once. If it is indeed possible that we may be committing more than once in the course of calling self._check_job_at_startup(job), then that loop design with the nested transaction is completely incorrect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment