Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save landovsky/8c505ecab41eb38fa1c2cd23058a6ae3 to your computer and use it in GitHub Desktop.
Save landovsky/8c505ecab41eb38fa1c2cd23058a6ae3 to your computer and use it in GitHub Desktop.
Prevent Duplicates with Delayed Jobs
class AddFieldsToDelayedJobs < ActiveRecord::Migration
def change
add_column :delayed_jobs, :signature, :string, index: true
add_column :delayed_jobs, :args, :text
end
end
# /lib/delayed_duplicate_prevention_plugin.rb
require 'delayed_job'
class DelayedDuplicatePreventionPlugin < Delayed::Plugin
module SignatureConcern
extend ActiveSupport::Concern
included do
before_validation :add_signature
validate :manage_duplicate
end
attr_writer :strategy
@@strategies = [:delete_previous_duplicate, :prevent_duplicate, :allow_duplicate]
private
def add_signature
self.signature = generate_signature
self.args = self.payload_object.args.to_yaml
end
def generate_signature
pobj = payload_object
if pobj.object.respond_to?(:id) and pobj.object.id.present?
sig = "#{pobj.object.class}"
sig += ":#{pobj.object.id}"
else
sig = "#{pobj.object}"
end
sig += "##{pobj.method_name}"
return sig
end
def manage_duplicate
return if @strategy == :allow_duplicate
@strategy ||= :prevent_duplicate # default strategy
raise "Only the following strategies are permitted: #{@@strategies}" unless @@strategies.include?(@strategy)
@checker = DuplicateChecker.new(self)
return unless @checker.duplicate?
send(@strategy)
end
def prevent_duplicate
if @checker.duplicate?
Rails.logger.warn "Found duplicate job(#{self.signature}), ignoring..."
errors.add(:base, "This is a duplicate")
end
end
def allow_duplicate
end
def delete_previous_duplicate
if @checker.duplicate?
@checker.duplicates.each { |job| Delayed::Job.destroy(job.id) }
logger.warn "__________: #{self.class} #{__method__} DELETED #{@checker.duplicates.count} duplicate jobs"
end
end
end
class DuplicateChecker
attr_reader :job
attr_accessor :duplicates
def self.duplicate?(job)
new(job).duplicate?
end
def initialize(job)
@job = job
@duplicates = []
enumerate_duplicates
end
def enumerate_duplicates
possible_dupes = Delayed::Job.where(signature: job.signature)
possible_dupes = possible_dupes.where.not(id: job.id) if job.id.present?
result = possible_dupes.each do |possible_dupe|
duplicates << possible_dupe if possible_dupe.args == job.args
end
end
def duplicate?
!duplicates.empty?
end
private
def args_match?(job1, job2)
# TODO: make this logic robust
normalize_args(job1.args) == normalize_args(job2.args)
end
def normalize_args(args)
args.kind_of?(String) ? YAML.load(args) : args
end
end
end
# config/initializers/delayed_job.rb
require 'delayed_duplicate_prevention_plugin'
Delayed::Backend::ActiveRecord::Job.send(:include, DelayedDuplicatePreventionPlugin::SignatureConcern)
Delayed::Worker.plugins << DelayedDuplicatePreventionPlugin
@synth
Copy link

synth commented Apr 12, 2019

Nice! Love the strategy approach

@nnanel
Copy link

nnanel commented Apr 15, 2021

Hi! Thanks for sharing. Just a typo in delayed_duplicate_prevention_plugin.rb:73 there is a space in "enume rate_duplicates"

@landovsky
Copy link
Author

Hi! Thanks for sharing. Just a typo in delayed_duplicate_prevention_plugin.rb:73 there is a space in "enume rate_duplicates"

Thanks for the catch. Fixed!

@vfonic
Copy link

vfonic commented Jul 2, 2021

Thanks for the solution!

I've noticed that your code checks for duplicates, even for allow_duplicate strategy. This makes the unnecessary db query.

Also, since you're querying the records based on the signature, you should add the index to that column.

Here's how I'd rewrite manage_duplicate method:

def manage_duplicate
  @strategy ||= :prevent_duplicate # default strategy
  raise("Only the following strategies are permitted: #{@@strategies}") unless @@strategies.include?(@strategy)
  @checker = DuplicateChecker.new(self)
  return unless @checker.duplicate?

  send(@strategy)
end

@landovsky
Copy link
Author

Thanks for the solution!

I've noticed that your code checks for duplicates, even for allow_duplicate strategy. This makes the unnecessary db query.

Also, since you're querying the records based on the signature, you should add the index to that column.

Thank you Viktor. All valid observations. I've updated the code.

@vfonic
Copy link

vfonic commented Jul 4, 2021

@landovsky thanks! I'm just adding this code to my project. I noticed that this line unfortunately won't work:
https://gist.github.com/landovsky/8c505ecab41eb38fa1c2cd23058a6ae3#file-add_signature_fields_to_delayed_jobs-rb-L3

add_column :delayed_jobs, :signature, :string, index: true

Seems like this does not add the index, but it also doesn't even raise any errors. It simply fails silently. :/

I tried to bump the discussion on this again: rails/rails#39230

@danielpuglisi
Copy link

I'm not sure why, but I get a no method error for payload_object.object and payload_object.arguments as my payload_objects only have a job_data reader method. I'm using Rails, so that might be due to ActiveJob.

Anyway, is there any reason for comparing the signature + the arguments? I've revised the code above and simplified it by creating and checking an MD5 checksum using the Job class and arguments: https://gist.github.com/danielpuglisi/08bc1c530064fc03463ed109ef95f598

Would love to hear some feedback.

@vfonic
Copy link

vfonic commented Jul 6, 2021

Here's my try:

https://gist.github.com/vfonic/fe509d53929dfc70d0463c00feec2606

I've removed the strategies, as I only need one strategy for now.
I don't generate MD5 checksum, as I don't see any added value. There's also risk of two MD5 checksums ending up being the same even if the original signature is not the same. It's rare, but why add this layer of complexity.
I've fixed the payload_object.object by extending the DelayedJob ActiveJob adapter class.

@danielpuglisi
Copy link

@vfonic Agreed. Thanks for the update. Updated my gist based on yours as I need the same functionality with locked / failed jobs 👍 Simplified it some more using exists? instead of blank? for checking existing jobs.

Thought about using jsonb (Postgresql) for the handler column so we could dump the signature column entirely. I don't have the time to investigate how much delayed job is dependent on the YAML format and what would need to change to make that happen. There is an open issue since 2016 (collectiveidea/delayed_job#957) and maybe someone could make that happen :)

@channainfo
Copy link

Taking some concept from here I implemented a unique job using an active job callback https://gist.github.com/channainfo/b920eeda6b20576310c1fae9780dbedc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment