Skip to content

Instantly share code, notes, and snippets.

@synth
Last active November 21, 2023 10:03
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save synth/fba7baeffd083a931184 to your computer and use it in GitHub Desktop.
Save synth/fba7baeffd083a931184 to your computer and use it in GitHub Desktop.
Prevent Duplicates with Delayed Jobs
class AddFieldsToDelayedJobs < ActiveRecord::Migration
def change
add_column :delayed_jobs, :signature, :string
add_column :delayed_jobs, :args, :text
end
end
# /lib/delayed_duplicate_prevention_plugin.rb
require 'delayed_job'
class DelayedDuplicatePreventionPlugin < Delayed::Plugin
module SignatureConcern
extend ActiveSupport::Concern
included do
before_validation :add_signature
validate :prevent_duplicate
end
private
def add_signature
self.signature = generate_signature
self.args = self.payload_object.args
end
def generate_signature
pobj = payload_object
if pobj.object.respond_to?(:id) and pobj.object.id.present?
sig = "#{pobj.object.class}"
sig += ":#{pobj.object.id}"
else
sig = "#{pobj.object}"
end
sig += "##{pobj.method_name}"
return sig
end
def prevent_duplicate
if DuplicateChecker.duplicate?(self)
Rails.logger.warn "Found duplicate job(#{self.signature}), ignoring..."
errors.add(:base, "This is a duplicate")
end
end
end
class DuplicateChecker
attr_reader :job
def self.duplicate?(job)
new(job).duplicate?
end
def initialize(job)
@job = job
end
def duplicate?
possible_dupes = Delayed::Job.where(signature: job.signature)
possible_dupes = possible_dupes.where.not(id: job.id) if job.id.present?
result = possible_dupes.any?{|possible_dupe| args_match?(possible_dupe, job)}
result
end
private
def args_match?(job1, job2)
# TODO: make this logic robust
normalize_args(job1.args) == normalize_args(job2.args)
end
def normalize_args(args)
args.kind_of?(String) ? YAML.load(args) : args
end
end
end
# config/initializers/delayed_job.rb
require 'delayed_duplicate_prevention_plugin'
Delayed::Backend::ActiveRecord::Job.send(:include, DelayedDuplicatePreventionPlugin::SignatureConcern)
Delayed::Worker.plugins << DelayedDuplicatePreventionPlugin
@nedden
Copy link

nedden commented Feb 25, 2015

Hi! You can return false in add_signature if signature not unique in table

def add_signature
      self.signature = generate_signature
      self.args = self.payload_object.args
      if Delayed::Job.exists?(signature: self.signature)
        return false
      end
 end

http://api.rubyonrails.org/classes/ActiveRecord/Callbacks.html

If a before_* callback returns false, all the later callbacks and the associated action are cancelled. If an >after_* callback returns false, all the later callbacks are cancelled. Callbacks are generally run in the order >they are defined, with the exception of callbacks defined as methods on the model, which are called last.

@synth
Copy link
Author

synth commented Mar 20, 2015

@nedden - thanks, i refactored a bit. I think the first time I coded this it was a bit rushed. Code is still a bit gnarly, so ymmv.

@philsmy
Copy link

philsmy commented Jul 28, 2015

Sadly, this doesn't work (at least not for me) if the object you are queuing is a custom class. This only seems to work using the .delay or handle_asynchronously methods. Otherwise the payload object doesn't have an embedded object (or respond to 'method_name').

@jmrepetti
Copy link

Based on this, I just added a new field 'tag' to delayed_jobs table, then I can specify a tag when launching the job. I can also add a validation to avoid duplications (I didn't) but I search first based on this tag if a job is already enqueued.

class AddTagToDelayedJobs < ActiveRecord::Migration
  def change
    add_column :delayed_jobs, :tag, :string
  end
end 


module DJTagExtension
  extend ActiveSupport::Concern

  included do
    attr_accessible :tag
  end
end


Delayed::Backend::ActiveRecord::Job.send(:include, DJTagExtension)

#I can specify the tag value when launching the job
Object.delay(queue: 'tracking', tag:"object_id_or_else", priority: 5).run!

@landovsky
Copy link

landovsky commented Apr 16, 2017

I think I may have found a bug.

It was causing the following error during normalize_args(args):
"did not find expected node content while parsing a flow node"

Suggested fix here:

private
	def add_signature
	self.signature = generate_signature
	self.args = self.payload_object.args.to_yaml #added ".to_yaml" to prevent
end

@landovsky
Copy link

landovsky commented Apr 17, 2017

I made some changes to the plugin, allowing for other duplicate prevention strategies. Available strategies are:

  • prevent_duplicate (default) - prevents the current job to be added
  • delete_previous_duplicate (find existing jobs that will do the same as the job being added and deletes them)
  • allow_duplicate - disables the plugin and allows for duplicate job to be added

You can add your own strategy by writing a new corresponding method and registering the strategy in @@strategies class variable.

Plugin here: https://gist.github.com/landovsky/8c505ecab41eb38fa1c2cd23058a6ae3

@blairanderson
Copy link

Why create a new column instead of querying the handler?

https://stackoverflow.com/a/70041500/1536309

@synth
Copy link
Author

synth commented Nov 19, 2021

Because querying a long serialized string on a large table won't scale well when the table is large. I think a hybrid approach would work well though where you debounce not against the handler but against a key or signature that identifies the job.

@arnaudlevy
Copy link

Hey @synth thanks a lot!
We did a gem based on that :)
https://github.com/noesya/delayed_job_prevent_duplicate

@synth
Copy link
Author

synth commented Sep 7, 2023

Very cool! gem > gist :)

@channainfo
Copy link

@synth thanks.

I like interacting with active job API rather than the delayed_job syntax. Taking some concept from here I implemented a unique job using an active job callback https://gist.github.com/channainfo/b920eeda6b20576310c1fae9780dbedc

@arnaudlevy
Copy link

@SebouChu @pabois should we integrate in the gem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment