-
-
Save landovsky/8c505ecab41eb38fa1c2cd23058a6ae3 to your computer and use it in GitHub Desktop.
class AddFieldsToDelayedJobs < ActiveRecord::Migration | |
def change | |
add_column :delayed_jobs, :signature, :string, index: true | |
add_column :delayed_jobs, :args, :text | |
end | |
end |
# /lib/delayed_duplicate_prevention_plugin.rb | |
require 'delayed_job' | |
class DelayedDuplicatePreventionPlugin < Delayed::Plugin | |
module SignatureConcern | |
extend ActiveSupport::Concern | |
included do | |
before_validation :add_signature | |
validate :manage_duplicate | |
end | |
attr_writer :strategy | |
@@strategies = [:delete_previous_duplicate, :prevent_duplicate, :allow_duplicate] | |
private | |
def add_signature | |
self.signature = generate_signature | |
self.args = self.payload_object.args.to_yaml | |
end | |
def generate_signature | |
pobj = payload_object | |
if pobj.object.respond_to?(:id) and pobj.object.id.present? | |
sig = "#{pobj.object.class}" | |
sig += ":#{pobj.object.id}" | |
else | |
sig = "#{pobj.object}" | |
end | |
sig += "##{pobj.method_name}" | |
return sig | |
end | |
def manage_duplicate | |
return if @strategy == :allow_duplicate | |
@strategy ||= :prevent_duplicate # default strategy | |
raise "Only the following strategies are permitted: #{@@strategies}" unless @@strategies.include?(@strategy) | |
@checker = DuplicateChecker.new(self) | |
return unless @checker.duplicate? | |
send(@strategy) | |
end | |
def prevent_duplicate | |
if @checker.duplicate? | |
Rails.logger.warn "Found duplicate job(#{self.signature}), ignoring..." | |
errors.add(:base, "This is a duplicate") | |
end | |
end | |
def allow_duplicate | |
end | |
def delete_previous_duplicate | |
if @checker.duplicate? | |
@checker.duplicates.each { |job| Delayed::Job.destroy(job.id) } | |
logger.warn "__________: #{self.class} #{__method__} DELETED #{@checker.duplicates.count} duplicate jobs" | |
end | |
end | |
end | |
class DuplicateChecker | |
attr_reader :job | |
attr_accessor :duplicates | |
def self.duplicate?(job) | |
new(job).duplicate? | |
end | |
def initialize(job) | |
@job = job | |
@duplicates = [] | |
enumerate_duplicates | |
end | |
def enumerate_duplicates | |
possible_dupes = Delayed::Job.where(signature: job.signature) | |
possible_dupes = possible_dupes.where.not(id: job.id) if job.id.present? | |
result = possible_dupes.each do |possible_dupe| | |
duplicates << possible_dupe if possible_dupe.args == job.args | |
end | |
end | |
def duplicate? | |
!duplicates.empty? | |
end | |
private | |
def args_match?(job1, job2) | |
# TODO: make this logic robust | |
normalize_args(job1.args) == normalize_args(job2.args) | |
end | |
def normalize_args(args) | |
args.kind_of?(String) ? YAML.load(args) : args | |
end | |
end | |
end |
# config/initializers/delayed_job.rb | |
require 'delayed_duplicate_prevention_plugin' | |
Delayed::Backend::ActiveRecord::Job.send(:include, DelayedDuplicatePreventionPlugin::SignatureConcern) | |
Delayed::Worker.plugins << DelayedDuplicatePreventionPlugin | |
Hi! Thanks for sharing. Just a typo in delayed_duplicate_prevention_plugin.rb:73 there is a space in "enume rate_duplicates"
Hi! Thanks for sharing. Just a typo in delayed_duplicate_prevention_plugin.rb:73 there is a space in "enume rate_duplicates"
Thanks for the catch. Fixed!
Thanks for the solution!
I've noticed that your code checks for duplicates, even for allow_duplicate
strategy. This makes the unnecessary db query.
Also, since you're querying the records based on the signature, you should add the index to that column.
Here's how I'd rewrite manage_duplicate
method:
def manage_duplicate
@strategy ||= :prevent_duplicate # default strategy
raise("Only the following strategies are permitted: #{@@strategies}") unless @@strategies.include?(@strategy)
@checker = DuplicateChecker.new(self)
return unless @checker.duplicate?
send(@strategy)
end
Thanks for the solution!
I've noticed that your code checks for duplicates, even for
allow_duplicate
strategy. This makes the unnecessary db query.Also, since you're querying the records based on the signature, you should add the index to that column.
Thank you Viktor. All valid observations. I've updated the code.
@landovsky thanks! I'm just adding this code to my project. I noticed that this line unfortunately won't work:
https://gist.github.com/landovsky/8c505ecab41eb38fa1c2cd23058a6ae3#file-add_signature_fields_to_delayed_jobs-rb-L3
add_column :delayed_jobs, :signature, :string, index: true
Seems like this does not add the index, but it also doesn't even raise any errors. It simply fails silently. :/
I tried to bump the discussion on this again: rails/rails#39230
I'm not sure why, but I get a no method error for payload_object.object
and payload_object.arguments
as my payload_object
s only have a job_data
reader method. I'm using Rails, so that might be due to ActiveJob.
Anyway, is there any reason for comparing the signature + the arguments? I've revised the code above and simplified it by creating and checking an MD5 checksum using the Job class and arguments: https://gist.github.com/danielpuglisi/08bc1c530064fc03463ed109ef95f598
Would love to hear some feedback.
Here's my try:
https://gist.github.com/vfonic/fe509d53929dfc70d0463c00feec2606
I've removed the strategies, as I only need one strategy for now.
I don't generate MD5 checksum, as I don't see any added value. There's also risk of two MD5 checksums ending up being the same even if the original signature is not the same. It's rare, but why add this layer of complexity.
I've fixed the payload_object.object
by extending the DelayedJob ActiveJob adapter class.
@vfonic Agreed. Thanks for the update. Updated my gist based on yours as I need the same functionality with locked / failed jobs 👍 Simplified it some more using exists?
instead of blank?
for checking existing jobs.
Thought about using jsonb (Postgresql) for the handler column so we could dump the signature column entirely. I don't have the time to investigate how much delayed job is dependent on the YAML format and what would need to change to make that happen. There is an open issue since 2016 (collectiveidea/delayed_job#957) and maybe someone could make that happen :)
Taking some concept from here I implemented a unique job using an active job callback https://gist.github.com/channainfo/b920eeda6b20576310c1fae9780dbedc
Nice! Love the strategy approach