Skip to content

Instantly share code, notes, and snippets.

@leontrolski
Created April 7, 2022 13:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leontrolski/b3062dd1e97e37e5b9d7a6d6eefa8fe8 to your computer and use it in GitHub Desktop.
Save leontrolski/b3062dd1e97e37e5b9d7a6d6eefa8fe8 to your computer and use it in GitHub Desktop.
testing FOR UPDATE SKIP LOCKED
# pip install sqlalchemy testing.postgresql
# see also: https://blog.crunchydata.com/blog/message-queuing-using-native-postgresql
import threading
from contextlib import contextmanager
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import testing.postgresql
Base = declarative_base()
class Job(Base):
__tablename__ = "job"
job_id = Column(Integer, primary_key=True)
status = Column(String)
@contextmanager
def db_with_one_todo_job():
with testing.postgresql.Postgresql() as postgresql:
engine = create_engine(postgresql.url())
Base.metadata.create_all(engine)
session = sessionmaker(engine)()
session.add(Job(status="todo"))
session.commit()
yield engine
sleep = "AND (SELECT true FROM (SELECT pg_sleep(0.1)) AS _)"
update_sleep = f"""
UPDATE job
SET status = 'began'
WHERE job_id IN (
SELECT job_id FROM JOB
WHERE status = 'todo'
LIMIT 1
)
{sleep}
RETURNING job_id
"""
update_sleep_with_lock = f"""
UPDATE job
SET status = 'began'
WHERE job_id IN (
SELECT job_id FROM JOB
WHERE status = 'todo'
LIMIT 1
FOR UPDATE SKIP LOCKED
)
{sleep}
RETURNING job_id
"""
update_no_sleep = """
UPDATE job
SET status = 'began'
WHERE job_id IN (
SELECT job_id FROM JOB
WHERE status = 'todo'
LIMIT 1
)
RETURNING job_id
"""
with db_with_one_todo_job() as engine:
def delay_at_start():
print("running 1")
qry = engine.execute(update_sleep)
print("1 UPDATEs: ", [job.job_id for job in qry])
def delay_at_end():
print("running 2")
qry = engine.execute(update_no_sleep)
print("2 UPDATEs: ", [job.job_id for job in qry])
thread_1 = threading.Thread(target=delay_at_start)
thread_2 = threading.Thread(target=delay_at_end)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
print()
with db_with_one_todo_job() as engine:
def delay_at_start():
print("running 1 with FOR UPDATE SKIP LOCKED")
qry = engine.execute(update_sleep_with_lock)
print("1 UPDATEs: ", [job.job_id for job in qry])
def delay_at_end():
print("running 2")
qry = engine.execute(update_no_sleep)
print("2 UPDATEs: ", [job.job_id for job in qry])
thread_1 = threading.Thread(target=delay_at_start)
thread_2 = threading.Thread(target=delay_at_end)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment