Created
April 7, 2022 13:45
-
-
Save leontrolski/b3062dd1e97e37e5b9d7a6d6eefa8fe8 to your computer and use it in GitHub Desktop.
testing FOR UPDATE SKIP LOCKED
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install sqlalchemy testing.postgresql | |
# see also: https://blog.crunchydata.com/blog/message-queuing-using-native-postgresql | |
import threading | |
from contextlib import contextmanager | |
from sqlalchemy import Column, Integer, String, create_engine | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker | |
import testing.postgresql | |
Base = declarative_base() | |
class Job(Base): | |
__tablename__ = "job" | |
job_id = Column(Integer, primary_key=True) | |
status = Column(String) | |
@contextmanager | |
def db_with_one_todo_job(): | |
with testing.postgresql.Postgresql() as postgresql: | |
engine = create_engine(postgresql.url()) | |
Base.metadata.create_all(engine) | |
session = sessionmaker(engine)() | |
session.add(Job(status="todo")) | |
session.commit() | |
yield engine | |
sleep = "AND (SELECT true FROM (SELECT pg_sleep(0.1)) AS _)" | |
update_sleep = f""" | |
UPDATE job | |
SET status = 'began' | |
WHERE job_id IN ( | |
SELECT job_id FROM JOB | |
WHERE status = 'todo' | |
LIMIT 1 | |
) | |
{sleep} | |
RETURNING job_id | |
""" | |
update_sleep_with_lock = f""" | |
UPDATE job | |
SET status = 'began' | |
WHERE job_id IN ( | |
SELECT job_id FROM JOB | |
WHERE status = 'todo' | |
LIMIT 1 | |
FOR UPDATE SKIP LOCKED | |
) | |
{sleep} | |
RETURNING job_id | |
""" | |
update_no_sleep = """ | |
UPDATE job | |
SET status = 'began' | |
WHERE job_id IN ( | |
SELECT job_id FROM JOB | |
WHERE status = 'todo' | |
LIMIT 1 | |
) | |
RETURNING job_id | |
""" | |
with db_with_one_todo_job() as engine: | |
def delay_at_start(): | |
print("running 1") | |
qry = engine.execute(update_sleep) | |
print("1 UPDATEs: ", [job.job_id for job in qry]) | |
def delay_at_end(): | |
print("running 2") | |
qry = engine.execute(update_no_sleep) | |
print("2 UPDATEs: ", [job.job_id for job in qry]) | |
thread_1 = threading.Thread(target=delay_at_start) | |
thread_2 = threading.Thread(target=delay_at_end) | |
thread_1.start() | |
thread_2.start() | |
thread_1.join() | |
thread_2.join() | |
print() | |
with db_with_one_todo_job() as engine: | |
def delay_at_start(): | |
print("running 1 with FOR UPDATE SKIP LOCKED") | |
qry = engine.execute(update_sleep_with_lock) | |
print("1 UPDATEs: ", [job.job_id for job in qry]) | |
def delay_at_end(): | |
print("running 2") | |
qry = engine.execute(update_no_sleep) | |
print("2 UPDATEs: ", [job.job_id for job in qry]) | |
thread_1 = threading.Thread(target=delay_at_start) | |
thread_2 = threading.Thread(target=delay_at_end) | |
thread_1.start() | |
thread_2.start() | |
thread_1.join() | |
thread_2.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment