Last active
November 27, 2023 18:28
-
-
Save jdavcs/1402a63c8afc9150646e809f3d0ae2d9 to your computer and use it in GitHub Desktop.
nested transactions demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import logging | |
from sqlalchemy import Column, Integer, String, create_engine, select | |
from sqlalchemy.orm import declarative_base, sessionmaker | |
#################### setup #################### | |
engine = create_engine(CONNECTION-STRING) | |
Base = declarative_base() | |
class Job(Base): | |
__tablename__ = "job" | |
id = Column(Integer, primary_key=True) | |
state = Column(String) | |
Base.metadata.create_all(engine) | |
@contextlib.contextmanager | |
def transaction(session): | |
if not session.in_transaction(): | |
with session.begin(): | |
yield | |
else: | |
yield | |
def get_jobs(session): | |
return session.scalars(select(Job).order_by(Job.id)) | |
def get_session(): | |
return sessionmaker(bind=engine, autoflush=False, autocommit=False)() | |
#################### tests #################### | |
"""For all demos: set ERROR_JOB_ID to simulate error at a given step in the iteration.""" | |
def run_original_version(): | |
""" | |
Expeted result: | |
- jobs processed before error should be commited | |
- errored job: rolled back | |
- subsequent jobs: not reached | |
""" | |
with get_session() as session, session.begin(): | |
try: | |
for job in get_jobs(session): | |
print(f"processing job {job.id}") | |
with session.begin_nested(): | |
change_state(session, job, ERROR_JOB_ID) | |
finally: | |
session.commit() | |
def run_17085(): | |
""" | |
Expeted result: | |
- jobs processed before error should be commited | |
- errored job: NOT rolled back | |
- subsequent jobs: ??? | |
Ref: #17085 | |
""" | |
with get_session() as session: | |
for job in get_jobs(session): | |
print(f"processing job {job.id}") | |
try: | |
change_state(session, job, ERROR_JOB_ID) | |
except Exception: | |
logging.error("Error while recovering job %s during application startup.", job.id) | |
with transaction(session): | |
session.commit() | |
def run_corrected(): | |
""" | |
Expeted result: | |
- jobs processed before error should be commited | |
- errored job: NOT rolled back | |
- subsequent jobs: committed | |
""" | |
with get_session() as session, session.begin(): | |
try: | |
for job in get_jobs(session): | |
print(f"processing job {job.id}") | |
try: | |
with session.begin_nested(): | |
change_state(session, job, ERROR_JOB_ID) | |
except Exception: | |
logging.error("Error while recovering job %s during application startup.", job.id) | |
finally: | |
session.commit() | |
def change_state(session, job, error_id=None): | |
with transaction(session): | |
job.state = "updated" | |
if job.id == error_id: | |
raise Exception("MY EXCEPTION") # simulate error | |
session.commit() | |
ERROR_JOB_ID = 15 | |
#run_original_version() | |
#run_17085() | |
run_corrected() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# state before running | |
sa20test5=# select * from job order by id; | |
id | state | |
----+------- | |
13 | new | |
14 | new | |
15 | new | |
16 | new | |
17 | new | |
(5 rows) | |
# run original | |
(.venv_sa14) rivendell$ python demo_nested_transaction_17085.py | |
2023-11-27 11:39:09,707 INFO sqlalchemy.engine.Engine select pg_catalog.version() | |
2023-11-27 11:39:09,707 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine select current_schema() | |
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine show standard_conforming_strings | |
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:39:09,709 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s | |
2023-11-27 11:39:09,710 INFO sqlalchemy.engine.Engine [generated in 0.00008s] {'name': 'job'} | |
2023-11-27 11:39:09,710 INFO sqlalchemy.engine.Engine COMMIT | |
2023-11-27 11:39:09,710 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:39:09,711 INFO sqlalchemy.engine.Engine SELECT job.id, job.state | |
FROM job ORDER BY job.id | |
2023-11-27 11:39:09,711 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {} | |
processing job 13 | |
2023-11-27 11:39:09,712 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_1 | |
2023-11-27 11:39:09,712 INFO sqlalchemy.engine.Engine [no key 0.00005s] {} | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [generated in 0.00007s] {'state': 'updated', 'job_id': 13} | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_1 | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [no key 0.00005s] {} | |
processing job 14 | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_2 | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [no key 0.00004s] {} | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:39:09,713 INFO sqlalchemy.engine.Engine [cached since 0.0008041s ago] {'state': 'updated', 'job_id': 14} | |
2023-11-27 11:39:09,714 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_2 | |
2023-11-27 11:39:09,714 INFO sqlalchemy.engine.Engine [no key 0.00005s] {} | |
processing job 15 | |
2023-11-27 11:39:09,714 INFO sqlalchemy.engine.Engine COMMIT | |
Traceback (most recent call last): | |
File "/home/jdavcs/0dev/drafts/python/sqlalchemy/sa20/demo_nested_transaction_17085.py", line 108, in <module> | |
run_original_version() | |
File "/home/jdavcs/0dev/drafts/python/sqlalchemy/sa20/demo_nested_transaction_17085.py", line 55, in run_original_version | |
change_state(session, job, ERROR_JOB_ID) | |
File "/home/jdavcs/0dev/drafts/python/sqlalchemy/sa20/demo_nested_transaction_17085.py", line 103, in change_state | |
raise Exception("MY EXCEPTION") # simulate error | |
Exception: MY EXCEPTION | |
# state after running | |
sa20test5=# select * from job order by id; | |
id | state | |
----+--------- | |
13 | updated | |
14 | updated | |
15 | new | |
16 | new | |
17 | new | |
(5 rows) | |
# RESET STATE TO 'new' | |
sa20test5=# update job set state = 'new'; | |
UPDATE 5 | |
sa20test5=# select * from job order by id; | |
id | state | |
----+------- | |
13 | new | |
14 | new | |
15 | new | |
16 | new | |
17 | new | |
(5 rows) | |
# THEN RUN #17085 | |
2023-11-27 11:44:43,912 INFO sqlalchemy.engine.Engine select pg_catalog.version() | |
2023-11-27 11:44:43,913 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine select current_schema() | |
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine show standard_conforming_strings | |
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:44:43,914 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s | |
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine [generated in 0.00007s] {'name': 'job'} | |
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine COMMIT | |
2023-11-27 11:44:43,915 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:44:43,916 INFO sqlalchemy.engine.Engine SELECT job.id, job.state | |
FROM job ORDER BY job.id | |
2023-11-27 11:44:43,916 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {} | |
processing job 13 | |
2023-11-27 11:44:43,918 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:44:43,918 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {'state': 'updated', 'job_id': 13} | |
2023-11-27 11:44:43,918 INFO sqlalchemy.engine.Engine COMMIT | |
2023-11-27 11:44:43,929 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:44:43,930 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state | |
FROM job | |
WHERE job.id = %(pk_1)s | |
2023-11-27 11:44:43,930 INFO sqlalchemy.engine.Engine [generated in 0.00008s] {'pk_1': 14} | |
processing job 14 | |
2023-11-27 11:44:43,931 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:44:43,931 INFO sqlalchemy.engine.Engine [cached since 0.01305s ago] {'state': 'updated', 'job_id': 14} | |
2023-11-27 11:44:43,931 INFO sqlalchemy.engine.Engine COMMIT | |
2023-11-27 11:44:43,934 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:44:43,934 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state | |
FROM job | |
WHERE job.id = %(pk_1)s | |
2023-11-27 11:44:43,934 INFO sqlalchemy.engine.Engine [cached since 0.003971s ago] {'pk_1': 15} | |
ERROR:root:Error while recovering job 15 during application startup. | |
processing job 15 | |
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state | |
FROM job | |
WHERE job.id = %(pk_1)s | |
INFO:sqlalchemy.engine.Engine:SELECT job.id AS job_id, job.state AS job_state | |
FROM job | |
WHERE job.id = %(pk_1)s | |
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine [cached since 0.004532s ago] {'pk_1': 16} | |
INFO:sqlalchemy.engine.Engine:[cached since 0.004532s ago] {'pk_1': 16} | |
processing job 16 | |
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine [generated in 0.00006s] ({'state': 'updated', 'job_id': 15}, {'state': 'updated', 'job_id': 16}) | |
INFO:sqlalchemy.engine.Engine:[generated in 0.00006s] ({'state': 'updated', 'job_id': 15}, {'state': 'updated', 'job_id': 16}) | |
2023-11-27 11:44:43,935 INFO sqlalchemy.engine.Engine COMMIT | |
INFO:sqlalchemy.engine.Engine:COMMIT | |
2023-11-27 11:44:43,938 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
INFO:sqlalchemy.engine.Engine:BEGIN (implicit) | |
2023-11-27 11:44:43,938 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state | |
FROM job | |
WHERE job.id = %(pk_1)s | |
INFO:sqlalchemy.engine.Engine:SELECT job.id AS job_id, job.state AS job_state | |
FROM job | |
WHERE job.id = %(pk_1)s | |
2023-11-27 11:44:43,938 INFO sqlalchemy.engine.Engine [cached since 0.008437s ago] {'pk_1': 17} | |
INFO:sqlalchemy.engine.Engine:[cached since 0.008437s ago] {'pk_1': 17} | |
processing job 17 | |
2023-11-27 11:44:43,939 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:44:43,939 INFO sqlalchemy.engine.Engine [cached since 0.02138s ago] {'state': 'updated', 'job_id': 17} | |
INFO:sqlalchemy.engine.Engine:[cached since 0.02138s ago] {'state': 'updated', 'job_id': 17} | |
2023-11-27 11:44:43,939 INFO sqlalchemy.engine.Engine COMMIT | |
INFO:sqlalchemy.engine.Engine:COMMIT | |
# result | |
sa20test5=# select * from job order by id; | |
id | state | |
----+--------- | |
13 | updated | |
14 | updated | |
15 | updated | |
16 | updated | |
17 | updated | |
(5 rows) | |
# RESET STATE TO 'new' | |
sa20test5=# update job set state = 'new'; | |
UPDATE 5 | |
sa20test5=# select * from job order by id; | |
id | state | |
----+------- | |
13 | new | |
14 | new | |
15 | new | |
16 | new | |
17 | new | |
(5 rows) | |
# RUN CORRECTED VERSION: | |
2023-11-27 11:46:52,834 INFO sqlalchemy.engine.Engine select pg_catalog.version() | |
2023-11-27 11:46:52,834 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine select current_schema() | |
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine show standard_conforming_strings | |
2023-11-27 11:46:52,835 INFO sqlalchemy.engine.Engine [raw sql] {} | |
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s | |
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine [generated in 0.00007s] {'name': 'job'} | |
2023-11-27 11:46:52,836 INFO sqlalchemy.engine.Engine COMMIT | |
2023-11-27 11:46:52,837 INFO sqlalchemy.engine.Engine BEGIN (implicit) | |
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine SELECT job.id, job.state | |
FROM job ORDER BY job.id | |
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {} | |
processing job 13 | |
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_1 | |
2023-11-27 11:46:52,838 INFO sqlalchemy.engine.Engine [no key 0.00005s] {} | |
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {'state': 'updated', 'job_id': 13} | |
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_1 | |
2023-11-27 11:46:52,839 INFO sqlalchemy.engine.Engine [no key 0.00004s] {} | |
processing job 14 | |
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_2 | |
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine [no key 0.00004s] {} | |
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine [cached since 0.0007629s ago] {'state': 'updated', 'job_id': 14} | |
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_2 | |
2023-11-27 11:46:52,840 INFO sqlalchemy.engine.Engine [no key 0.00004s] {} | |
processing job 15 | |
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine SELECT job.id AS job_id, job.state AS job_state | |
FROM job | |
WHERE job.id = %(pk_1)s | |
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine [generated in 0.00006s] {'pk_1': 15} | |
ERROR:root:Error while recovering job 15 during application startup. | |
processing job 16 | |
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_3 | |
INFO:sqlalchemy.engine.Engine:SAVEPOINT sa_savepoint_3 | |
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine [no key 0.00005s] {} | |
INFO:sqlalchemy.engine.Engine:[no key 0.00005s] {} | |
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:46:52,841 INFO sqlalchemy.engine.Engine [cached since 0.002325s ago] {'state': 'updated', 'job_id': 16} | |
INFO:sqlalchemy.engine.Engine:[cached since 0.002325s ago] {'state': 'updated', 'job_id': 16} | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_3 | |
INFO:sqlalchemy.engine.Engine:RELEASE SAVEPOINT sa_savepoint_3 | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [no key 0.00004s] {} | |
INFO:sqlalchemy.engine.Engine:[no key 0.00004s] {} | |
processing job 17 | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_4 | |
INFO:sqlalchemy.engine.Engine:SAVEPOINT sa_savepoint_4 | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [no key 0.00004s] {} | |
INFO:sqlalchemy.engine.Engine:[no key 0.00004s] {} | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
INFO:sqlalchemy.engine.Engine:UPDATE job SET state=%(state)s WHERE job.id = %(job_id)s | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [cached since 0.002891s ago] {'state': 'updated', 'job_id': 17} | |
INFO:sqlalchemy.engine.Engine:[cached since 0.002891s ago] {'state': 'updated', 'job_id': 17} | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_4 | |
INFO:sqlalchemy.engine.Engine:RELEASE SAVEPOINT sa_savepoint_4 | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine [no key 0.00004s] {} | |
INFO:sqlalchemy.engine.Engine:[no key 0.00004s] {} | |
2023-11-27 11:46:52,842 INFO sqlalchemy.engine.Engine COMMIT | |
INFO:sqlalchemy.engine.Engine:COMMIT | |
# final state | |
sa20test5=# select * from job order by id; | |
id | state | |
----+--------- | |
13 | updated | |
14 | updated | |
15 | new | |
16 | updated | |
17 | updated | |
(5 rows) | |
OK, I see your point. And yes, we can't commit within the begin
block after we have committed once. If it is indeed possible that we may be committing more than once in the course of calling self._check_job_at_startup(job)
, then that loop design with the nested transaction is completely incorrect.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/jdavcs/1402a63c8afc9150646e809f3d0ae2d9#file-demo-py-L91 presumes we only ever commit once in whatever is being called inside the with statement. That seems very risky:
I don't see the point of the rollback, I don't think it makes sense in this context.