Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save channainfo/b920eeda6b20576310c1fae9780dbedc to your computer and use it in GitHub Desktop.
Save channainfo/b920eeda6b20576310c1fae9780dbedc to your computer and use it in GitHub Desktop.
Unique job with delayed_job using active job api
# add a new migration to the delayed_job table to track uniqueness
class AddSignatureToDelayedJobs < ActiveRecord::Migration[7.0]
def change
add_column :delayed_jobs, :signature, 'varchar(64)', index: true
end
end
# frozen_string_literal: true
class ApplicationJob < ActiveJob::Base
include UniqueableJob
queue_as :default
end
require 'rails_helper'
class ApplicationDummyJob < ApplicationJob
def perform(app_id:)
end
def allow_unique_job
true
end
end
RSpec.describe ApplicationDummyJob do
before do
Delayed::Worker.delay_jobs = true
end
context 'when allow_unique_job is true' do
it "enqueue to job successfully" do
total_count = Delayed::Job.count
described_class.perform_later(app_id: 1)
expect(Delayed::Job.count).to eq( total_count + 1)
end
it 'enqueue only one unique job' do
total_count = Delayed::Job.count
described_class.perform_later(app_id: 1)
described_class.perform_later(app_id: 2)
described_class.perform_later(app_id: 1)
expect(Delayed::Job.count).to eq( total_count + 2 )
end
end
context 'when allow_unique_job is false' do
it 'always enqueue job' do
allow_any_instance_of(described_class).to receive(:allow_unique_job).and_return(false)
total_count = Delayed::Job.count
described_class.perform_later(app_id: 1)
described_class.perform_later(app_id: 2)
described_class.perform_later(app_id: 1)
expect(Delayed::Job.count).to eq( total_count + 3 )
end
end
end
# frozen_string_literal: true
module UniqueableJob
extend ActiveSupport::Concern
included do
around_enqueue do |job, block|
around_enqueue_unique(job, block)
end
after_enqueue do |job|
after_enqueue_unique(job)
end
end
def around_enqueue_unique(job, block)
return block.call unless job.allow_unique_job
signature = generate_job_signature(job)
existing_job = Delayed::Job.where(signature: signature).where(locked_at: nil).first
if existing_job
Rails.logger.info { 'Job already scheduled' }
else
Rails.logger.info { 'Schedule a new job already scheduled' }
block.call
end
end
def after_enqueue_unique(job)
return unless job.allow_unique_job
delayed_job = Delayed::Job.find_by(id: job.provider_job_id)
return if delayed_job.nil?
delayed_job.signature = generate_job_signature(job)
delayed_job.save!
end
def generate_job_signature(job)
job_signature = "#{job.class}-#{job.arguments.to_json}"
Digest::SHA256.hexdigest(job_signature)
end
def allow_unique_job
false
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment