Skip to content

Instantly share code, notes, and snippets.

@purcell
Last active March 19, 2021 08:35
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save purcell/55090b74f257dea740369caa5d1ec0d5 to your computer and use it in GitHub Desktop.
Save purcell/55090b74f257dea740369caa5d1ec0d5 to your computer and use it in GitHub Desktop.
Easy task queues using PostgreSQL
-- Let's say you have a table full of work:
CREATE TABLE tasks (
id UUID PRIMARY KEY NOT NULL DEFAULT gen_random_uuid(),
status TEXT NOT NULL DEFAULT 'pending',
payload JSON NOT NULL, -- or just have meaningful columns!
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
ALTER TABLE tasks ADD CHECK (status IN ('pending', 'done', 'error'));
INSERT INTO tasks (payload) VALUES ('{ "action": "feed", "animal": "panda", "food": "kaiserschmarrn" }');
-- To find pending work, by polling, you query the table like this, in
-- a transaction:
BEGIN;
SELECT * FROM tasks WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED;
-- ^ This query requires PostgreSQL 9.5+
-- Now you have the info in that row, and nobody else will see that
-- row until the transaction completes, so you do the work, and then:
UPDATE tasks SET status = 'done' WHERE id = the_task_id;
COMMIT;
-- The important thing is to decide the error handling strategy.
-- If an error happens, the transaction will generally be rolled back,
-- which means somebody else (you again?) will pick up the same
-- job. That can be problematic, so it's often best to do the work
-- using a separate connection and transaction. Then if that work
-- explodes, you can catch the error and instead execute:
UPDATE tasks SET status = 'error' WHERE id = the_task_id;
-- (and optionally save the failure info in another table or an evil
-- nullable column)
-- Now, I mentioned this was for polling. You can reduce/eliminate
-- polling by using a long-running connection and using LISTEN/NOTIFY
-- (https://www.postgresql.org/docs/9.0/sql-notify.html) on a
-- channel. The code responsible for creating tasks would call
-- pg_notify after inserting a new task. The worker code would use
-- LISTEN, and then it would perform the above query to obtain and
-- lock a task.
@purcell
Copy link
Author

purcell commented Sep 19, 2019

Check out using SAVEPOINTs to deal with the error handling: https://gist.github.com/krasio/761da3829988f3be1e371578ec6cef40

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment