Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save vfonic/fe509d53929dfc70d0463c00feec2606 to your computer and use it in GitHub Desktop.
Save vfonic/fe509d53929dfc70d0463c00feec2606 to your computer and use it in GitHub Desktop.
Prevent Duplicates with Delayed Jobs
# db/migrate/add_signature_fields_to_delayed_jobs.rb
class AddFieldsToDelayedJobs < ActiveRecord::Migration
def change
add_column :delayed_jobs, :signature, :string
add_index :delayed_jobs, :signature
end
end
# config/initializers/delayed_job.rb
# avoid "undefined method" errors for ActiveJob jobs
class ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper
def object
job_data['job_class'].constantize
end
def args
job_data['arguments']
end
def method_name
'perform'
end
end
class DelayedDuplicatePreventionPlugin < Delayed::Plugin
module SignatureConcern
extend ActiveSupport::Concern
included do
# we only need to add signature on create
before_validation :add_signature, on: :create
validate :manage_duplicate
end
private
def add_signature
self.signature = generate_signature
end
def generate_signature
job_class_name = payload_object.object.name
method_name = payload_object.method_name
arguments = payload_object.args
# MyAwesomeJob.perform(123, "articles")
"#{job_class_name}.#{method_name}(#{arguments.to_s[1..-2]})"
end
def manage_duplicate
# we want to allow duplicates for failed and currently running jobs
# for example: "after update, sync the record to external service (elastic search, CMS, API)
duplicates = Delayed::Job.where(signature: self.signature).where(failed_at: nil, locked_at: nil, locked_by: nil)
return if duplicates.blank?
logger.warn "Found duplicate job #{self.signature}, ignoring..."
errors.add(:base, 'This is a duplicate')
end
end
end
Delayed::Backend::ActiveRecord::Job.include DelayedDuplicatePreventionPlugin::SignatureConcern
Delayed::Worker.plugins << DelayedDuplicatePreventionPlugin
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment