Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Delayed job: uniqueness support, sequential execution support, keep successful jobs
# frozen_string_literal: true
# config/initializers/delayed_job_ext.rb
# unique jobs across workers
# Sample: my_model.delay(across_uniq_key: 'article-19').create
# Sample: my_model.delay(across_uniq_key: 'article-19').update
# Sample: my_model.delay(across_uniq_key: 'article-19').destroy
# ==> the jobs will be processed in serial, not in parallel:
# can not call update before create or run both at the same time
Delayed::Job.class_eval do
class << self
alias_method :ready_to_run_original, :ready_to_run
def ready_to_run(*args)
query = ready_to_run_original(*args)
current_keys = current_across_job_keys
return query unless current_keys.any?
exclude_current_across_jobs(query, current_keys)
end
private
def current_across_job_keys
Delayed::Job.unscoped.where.not(locked_at: nil, across_uniq_key: nil)
.pluck(:across_uniq_key)
end
def exclude_current_across_jobs(query, current_across_keys)
query_filter = '(locked_at is NULL AND across_uniq_key not in (?)) OR
across_uniq_key is NULL OR locked_at is not NULL'
query.where(query_filter, current_across_keys)
end
end
end
# unique pending jobs (Default true): skip if there is a similar pending job
# Sample: my_model.delay(unique: false).greeting(msg: 'Hello')
# Sample: my_model.delay(unique: true).greeting(msg: 'Hello')
Delayed::Job.class_eval do
attr_writer :unique # boolean: true/false
before_validation :generate_key, if: :unique
validate :verify_uniqueness, if: :unique
# default unique: true
def unique
@unique.nil? ? true : @unique
end
private
def generate_key
obj_info = YAML.load_dj(handler)
key_info = [obj_info.object.name, obj_info.method_name, obj_info.args]
self.job_uniq_key = key_info.to_param
end
def verify_uniqueness
similar_jobs = Delayed::Job.where(locked_by: nil,
job_uniq_key: job_uniq_key,
queue: queue)
return unless similar_jobs.any?
errors.add(:base, "Similar job is already in queue: #{similar_jobs.to_a}")
end
end
# Soft delete to keep successful jobs
# Sample: my_job.destroy ==> soft delete
# Sample: my_job.hard_destroy ==> hard delete
# Sample: Delayed::Job.include_completed => include completed jobs
Delayed::Job.class_eval do
default_scope -> { where(completed_at: nil) }
scope :include_completed, -> { unscope(where: :completed_at) }
alias_method :hard_destroy, :destroy
# soft destroy
def destroy
make_completed!
end
def make_completed!
update_column(:completed_at, Time.current)
end
end
# Reset running jobs when stopping worker: server is down/pod is deleted
Delayed::Worker.class_eval do
alias_method :stop_original, :stop
def stop
stop_original
unlock_current_jobs
end
private
def unlock_current_jobs
worker_jobs = Delayed::Job.where(locked_by: name)
worker_jobs.update_all(locked_by: nil, locked_at: nil)
end
end
# frozen_string_literal: true
class AddDelayedJobCustomAttributes < ActiveRecord::Migration[6.0]
def change
add_column :delayed_jobs, :job_uniq_key, :string, index: true
add_column :delayed_jobs, :across_uniq_key, :string
add_column :delayed_jobs, :completed_at, :datetime
add_index :delayed_jobs, %i[locked_at across_uniq_key]
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment