-
-
Save synth/fba7baeffd083a931184 to your computer and use it in GitHub Desktop.
class AddFieldsToDelayedJobs < ActiveRecord::Migration | |
def change | |
add_column :delayed_jobs, :signature, :string | |
add_column :delayed_jobs, :args, :text | |
end | |
end |
# /lib/delayed_duplicate_prevention_plugin.rb | |
require 'delayed_job' | |
class DelayedDuplicatePreventionPlugin < Delayed::Plugin | |
module SignatureConcern | |
extend ActiveSupport::Concern | |
included do | |
before_validation :add_signature | |
validate :prevent_duplicate | |
end | |
private | |
def add_signature | |
self.signature = generate_signature | |
self.args = self.payload_object.args | |
end | |
def generate_signature | |
pobj = payload_object | |
if pobj.object.respond_to?(:id) and pobj.object.id.present? | |
sig = "#{pobj.object.class}" | |
sig += ":#{pobj.object.id}" | |
else | |
sig = "#{pobj.object}" | |
end | |
sig += "##{pobj.method_name}" | |
return sig | |
end | |
def prevent_duplicate | |
if DuplicateChecker.duplicate?(self) | |
Rails.logger.warn "Found duplicate job(#{self.signature}), ignoring..." | |
errors.add(:base, "This is a duplicate") | |
end | |
end | |
end | |
class DuplicateChecker | |
attr_reader :job | |
def self.duplicate?(job) | |
new(job).duplicate? | |
end | |
def initialize(job) | |
@job = job | |
end | |
def duplicate? | |
possible_dupes = Delayed::Job.where(signature: job.signature) | |
possible_dupes = possible_dupes.where.not(id: job.id) if job.id.present? | |
result = possible_dupes.any?{|possible_dupe| args_match?(possible_dupe, job)} | |
result | |
end | |
private | |
def args_match?(job1, job2) | |
# TODO: make this logic robust | |
normalize_args(job1.args) == normalize_args(job2.args) | |
end | |
def normalize_args(args) | |
args.kind_of?(String) ? YAML.load(args) : args | |
end | |
end | |
end |
# config/initializers/delayed_job.rb | |
require 'delayed_duplicate_prevention_plugin' | |
Delayed::Backend::ActiveRecord::Job.send(:include, DelayedDuplicatePreventionPlugin::SignatureConcern) | |
Delayed::Worker.plugins << DelayedDuplicatePreventionPlugin | |
@nedden - thanks, i refactored a bit. I think the first time I coded this it was a bit rushed. Code is still a bit gnarly, so ymmv.
Sadly, this doesn't work (at least not for me) if the object you are queuing is a custom class. This only seems to work using the .delay or handle_asynchronously methods. Otherwise the payload object doesn't have an embedded object (or respond to 'method_name').
Based on this, I just added a new field 'tag' to delayed_jobs table, then I can specify a tag when launching the job. I can also add a validation to avoid duplications (I didn't) but I search first based on this tag if a job is already enqueued.
class AddTagToDelayedJobs < ActiveRecord::Migration
def change
add_column :delayed_jobs, :tag, :string
end
end
module DJTagExtension
extend ActiveSupport::Concern
included do
attr_accessible :tag
end
end
Delayed::Backend::ActiveRecord::Job.send(:include, DJTagExtension)
#I can specify the tag value when launching the job
Object.delay(queue: 'tracking', tag:"object_id_or_else", priority: 5).run!
I think I may have found a bug.
It was causing the following error during normalize_args(args):
"did not find expected node content while parsing a flow node"
Suggested fix here:
private
def add_signature
self.signature = generate_signature
self.args = self.payload_object.args.to_yaml #added ".to_yaml" to prevent
end
I made some changes to the plugin, allowing for other duplicate prevention strategies. Available strategies are:
- prevent_duplicate (default) - prevents the current job to be added
- delete_previous_duplicate (find existing jobs that will do the same as the job being added and deletes them)
- allow_duplicate - disables the plugin and allows for duplicate job to be added
You can add your own strategy by writing a new corresponding method and registering the strategy in @@strategies class variable.
Plugin here: https://gist.github.com/landovsky/8c505ecab41eb38fa1c2cd23058a6ae3
Why create a new column instead of querying the handler?
Because querying a long serialized string on a large table won't scale well when the table is large. I think a hybrid approach would work well though where you debounce not against the handler but against a key or signature that identifies the job.
Hey @synth thanks a lot!
We did a gem based on that :)
https://github.com/noesya/delayed_job_prevent_duplicate
Very cool! gem > gist :)
@synth thanks.
I like interacting with active job API rather than the delayed_job syntax. Taking some concept from here I implemented a unique job using an active job callback https://gist.github.com/channainfo/b920eeda6b20576310c1fae9780dbedc
Hi! You can return false in add_signature if signature not unique in table
http://api.rubyonrails.org/classes/ActiveRecord/Callbacks.html