Skip to content

Instantly share code, notes, and snippets.

@mbreit
Created March 17, 2017 20:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mbreit/7b84e8fd51b6d1889f4bc1f2612c3f9d to your computer and use it in GitHub Desktop.
Save mbreit/7b84e8fd51b6d1889f4bc1f2612c3f9d to your computer and use it in GitHub Desktop.
# Simple ActiveJob worker for PostgreSQL using LISTEN/NOTIFY.
#
# Supports most ActiveJob features like multiple queues, priorities
# and wait times.
#
# To use this as your Rails job queue, add this to your environment
# configuration (config/environments/production.rb):
#
# config.active_job.queue_adapter = PgJob::QueueAdapter.new
#
# Then run one or multiple workers for the default queue with
#
# bin/rails runner PgJob.work
#
# or for other queues with
#
# bin/rails runner "PgJob.work(:my_queue)"
#
class PgJob < ApplicationRecord
# QueueAdapter for ActiveJob.
class QueueAdapter
def enqueue(job)
Job.enqueue(job)
end
def enqueue_at(job, timestamp)
Job.enqueue(job, timestamp)
end
end
scope :due, -> { where('scheduled_for IS NULL OR scheduled_for <= ?', Time.current) }
scope :queue, -> { where(performed_at: nil).order(:priority, :created_at) }
validates :queue_name, format: {with: /\A[a-zA-Z1-9_]+\z/}
# Run a worker process for a given queue name.
# Will run all scheduled jobs in the queue ordered by their
# priorities (lowest first) and then wait for PostgreSQL LISTEN
# events to run new jobs. For jobs that are scheduled for a later
# time, it wakes up in an interval given by the timeout parameter
# to check for jobs that became due in the meantime.
#
# @param queue_name [String] The name of the queue to work on
# @param timeout [integer] Interval to check for due jobs
def self.work(queue_name = 'default', timeout: 10)
connection.execute "LISTEN pg_jobs_#{queue_name}"
loop do
# Consume all pending NOTIFY events
while connection.raw_connection.notifies; end
# Work jobs as long as there are pending jobs in the queue
while work_job(queue_name); end
# Wait for next NOTIFY event
connection.raw_connection.wait_for_notify(timeout)
end
ensure
connection.execute "UNLISTEN pg_jobs_#{queue_name}"
end
# Enqueue a new job to run at a given time or immediately
#
# @param job [ActiveJob::Base] The ActiveJob job object to schedule
# @param scheduled_for [Integer,Time] Timestamp when the job should be
# executed. Use nil if the job should be run immediately.
def self.enqueue(job, scheduled_for = nil)
queue_name = job.queue_name || 'default'
Job.create!(job_data: job.serialize,
scheduled_for: scheduled_for && Time.zone.at(scheduled_for),
priority: job.priority || 100,
queue_name: queue_name)
connection.execute "NOTIFY pg_jobs_#{queue_name}"
end
# Execute a single job from the given queue.
#
# @param queue_name [String] Name of the queue to look for a due job
def self.work_job(queue_name)
transaction do
job = queue.due.lock('FOR UPDATE SKIP LOCKED').find_by(queue_name: queue_name)
return false unless job
job.perform
job.performed_at = Time.zone.now
job.save!
end
end
# Execute the job. Calls `ActiveJob::Base.execute`.
def perform
ActiveJob::Base.execute(job_data)
rescue => e
Rails.logger.error("Error while executing job: #{e}\n" + e.backtrace.join('\n'))
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment