Skip to content

Instantly share code, notes, and snippets.

@croaky
Last active April 17, 2021 17:27
Show Gist options
  • Save croaky/a0e26fb9a4b930b84d98a98935b42e78 to your computer and use it in GitHub Desktop.
Save croaky/a0e26fb9a4b930b84d98a98935b42e78 to your computer and use it in GitHub Desktop.
Job queues in Ruby and Postgres

A few lines of Ruby with pg driver is a simple alternative to a job queuing library. Job queues are defined as database tables and workers are defined in one Ruby file.

queuea: bundle exec ruby queue/a.rb
queueb: bundle exec ruby queue/b.rb

I run one worker per queue on Heroku.

heroku ps:scale queuea=1,queueb=2

Depending on the queue requirements, either polling or pub/sub may be appropriate.

require "pg"
require_relative "job_one"
require_relative "job_two"
begin
conn = PG.connect(ENV.fetch("DATABASE_URL"))
conn.exec <<~SQL
CREATE TABLE job_queue (
id bigint NOT NULL,
created_at timestamp DEFAULT now() NOT NULL,
status text DEFAULT 'pending'::text NOT NULL,
job_name text NOT NULL,
data jsonb NOT NULL,
worked_at timestamp
);
SQL
puts "Waiting for work..."
loop do
sleep 10 # poll for new job
result = conn.exec(<<~SQL)
SELECT id, job_name, data
FROM job_queue
ORDER BY created_at ASC
SQL
result.each do |job|
t = Process.clock_gettime(Process::CLOCK_MONOTONIC)
status = case job["job_name"]
when "JobOne"
JobOne.call(job["data"])
when "JobTwo"
JobTwo.call(job["data"])
else
"invalid job name: #{job["job_name"]}"
end
conn.exec_params(<<~SQL, [status, job["id"]])
UPDATE job_queue
SET status = $1, worked_at = now()
WHERE id = $2
SQL
elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t).round(2)
puts "#{elapsed}s job #{job["id"]}: #{status}"
end
end
ensure
conn&.close
end
require "pg"
require_relative "job_one"
require_relative "job_two"
begin
conn = PG.connect(ENV.fetch("DATABASE_URL"))
conn.exec <<~SQL
CREATE TABLE job_queue (
id bigint NOT NULL,
created_at timestamp DEFAULT now() NOT NULL,
status text DEFAULT 'pending'::text NOT NULL,
job_name text NOT NULL,
data jsonb NOT NULL,
worked_at timestamp
);
SQL
conn.exec <<~SQL
CREATE OR REPLACE FUNCTION notify_job_queued() RETURNS TRIGGER AS $$
BEGIN
PERFORM
pg_notify('job_queued', cast(NEW.id AS varchar));
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
SQL
conn.exec <<~SQL
DROP TRIGGER IF EXISTS on_job_queue on job_queue;
CREATE TRIGGER on_job_queue
AFTER INSERT ON job_queue
FOR EACH ROW
EXECUTE PROCEDURE notify_job_queued();
SQL
conn.exec "LISTEN job_queued"
puts "Waiting on job_queued channel..."
loop do
conn.wait_for_notify do |event, pid, job_id|
result = conn.exec_params(<<~SQL, [job_id])
SELECT id, job_name, data
FROM job_queue
WHERE id = $1
SQL
if result.num_tuples != 1
next
end
job = result[0]
t = Process.clock_gettime(Process::CLOCK_MONOTONIC)
status = case job["job_name"]
when "JobOne"
JobOne.call(job["data"])
when "JobTwo"
JobTwo.call(job["data"])
else
"invalid job name: #{job["job_name"]}"
end
conn.exec_params(<<~SQL, [status, job["id"]])
UPDATE job_queue
SET status = $1, worked_at = now()
WHERE id = $2
SQL
elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t).round(2)
puts "#{elapsed}s job #{job["id"]}: #{status}"
end
end
ensure
conn&.exec "UNLISTEN job_queued"
conn&.close
end
@ismasan
Copy link

ismasan commented Oct 23, 2020

I think the trigger upsert needs to be

    DROP TRIGGER IF EXISTS on_job_queue on job_queue;
    CREATE TRIGGER on_job_queue
      AFTER INSERT ON job_queue
      FOR EACH ROW
      EXECUTE PROCEDURE notify_job_queued();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment