Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save landovsky/8c505ecab41eb38fa1c2cd23058a6ae3 to your computer and use it in GitHub Desktop.
Save landovsky/8c505ecab41eb38fa1c2cd23058a6ae3 to your computer and use it in GitHub Desktop.
Prevent Duplicates with Delayed Jobs
class AddFieldsToDelayedJobs < ActiveRecord::Migration
def change
add_column :delayed_jobs, :signature, :string, index: true
add_column :delayed_jobs, :args, :text
end
end
# /lib/delayed_duplicate_prevention_plugin.rb
require 'delayed_job'
class DelayedDuplicatePreventionPlugin < Delayed::Plugin
module SignatureConcern
extend ActiveSupport::Concern
included do
before_validation :add_signature
validate :manage_duplicate
end
attr_writer :strategy
@@strategies = [:delete_previous_duplicate, :prevent_duplicate, :allow_duplicate]
private
def add_signature
self.signature = generate_signature
self.args = self.payload_object.args.to_yaml
end
def generate_signature
pobj = payload_object
if pobj.object.respond_to?(:id) and pobj.object.id.present?
sig = "#{pobj.object.class}"
sig += ":#{pobj.object.id}"
else
sig = "#{pobj.object}"
end
sig += "##{pobj.method_name}"
return sig
end
def manage_duplicate
return if @strategy == :allow_duplicate
@strategy ||= :prevent_duplicate # default strategy
raise "Only the following strategies are permitted: #{@@strategies}" unless @@strategies.include?(@strategy)
@checker = DuplicateChecker.new(self)
return unless @checker.duplicate?
send(@strategy)
end
def prevent_duplicate
if @checker.duplicate?
Rails.logger.warn "Found duplicate job(#{self.signature}), ignoring..."
errors.add(:base, "This is a duplicate")
end
end
def allow_duplicate
end
def delete_previous_duplicate
if @checker.duplicate?
@checker.duplicates.each { |job| Delayed::Job.destroy(job.id) }
logger.warn "__________: #{self.class} #{__method__} DELETED #{@checker.duplicates.count} duplicate jobs"
end
end
end
class DuplicateChecker
attr_reader :job
attr_accessor :duplicates
def self.duplicate?(job)
new(job).duplicate?
end
def initialize(job)
@job = job
@duplicates = []
enumerate_duplicates
end
def enumerate_duplicates
possible_dupes = Delayed::Job.where(signature: job.signature)
possible_dupes = possible_dupes.where.not(id: job.id) if job.id.present?
result = possible_dupes.each do |possible_dupe|
duplicates << possible_dupe if possible_dupe.args == job.args
end
end
def duplicate?
!duplicates.empty?
end
private
def args_match?(job1, job2)
# TODO: make this logic robust
normalize_args(job1.args) == normalize_args(job2.args)
end
def normalize_args(args)
args.kind_of?(String) ? YAML.load(args) : args
end
end
end
# config/initializers/delayed_job.rb
require 'delayed_duplicate_prevention_plugin'
Delayed::Backend::ActiveRecord::Job.send(:include, DelayedDuplicatePreventionPlugin::SignatureConcern)
Delayed::Worker.plugins << DelayedDuplicatePreventionPlugin
@channainfo
Copy link

Taking some concept from here I implemented a unique job using an active job callback https://gist.github.com/channainfo/b920eeda6b20576310c1fae9780dbedc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment