Last active
March 20, 2020 10:36
-
-
Save owen2345/cfc8f75cfec987fe843d41795102af2b to your computer and use it in GitHub Desktop.
Delayed job: uniqueness support, sequential execution support, keep successful jobs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
# config/initializers/delayed_job_ext.rb | |
# unique jobs across workers | |
# Sample: my_model.delay(across_uniq_key: 'article-19').create | |
# Sample: my_model.delay(across_uniq_key: 'article-19').update | |
# Sample: my_model.delay(across_uniq_key: 'article-19').destroy | |
# ==> the jobs will be processed in serial, not in parallel: | |
# can not call update before create or run both at the same time | |
Delayed::Job.class_eval do | |
class << self | |
alias_method :ready_to_run_original, :ready_to_run | |
def ready_to_run(*args) | |
query = ready_to_run_original(*args) | |
current_keys = current_across_job_keys | |
return query unless current_keys.any? | |
exclude_current_across_jobs(query, current_keys) | |
end | |
private | |
def current_across_job_keys | |
Delayed::Job.unscoped.where.not(locked_at: nil, across_uniq_key: nil) | |
.pluck(:across_uniq_key) | |
end | |
def exclude_current_across_jobs(query, current_across_keys) | |
query_filter = '(locked_at is NULL AND across_uniq_key not in (?)) OR | |
across_uniq_key is NULL OR locked_at is not NULL' | |
query.where(query_filter, current_across_keys) | |
end | |
end | |
end | |
# unique pending jobs (Default true): skip if there is a similar pending job | |
# Sample: my_model.delay(unique: false).greeting(msg: 'Hello') | |
# Sample: my_model.delay(unique: true).greeting(msg: 'Hello') | |
Delayed::Job.class_eval do | |
attr_writer :unique # boolean: true/false | |
before_validation :generate_key, if: :unique | |
validate :verify_uniqueness, if: :unique | |
# default unique: true | |
def unique | |
@unique.nil? ? true : @unique | |
end | |
private | |
def generate_key | |
obj_info = YAML.load_dj(handler) | |
key_info = [obj_info.object.name, obj_info.method_name, obj_info.args] | |
self.job_uniq_key = key_info.to_param | |
end | |
def verify_uniqueness | |
similar_jobs = Delayed::Job.where(locked_by: nil, | |
job_uniq_key: job_uniq_key, | |
queue: queue) | |
return unless similar_jobs.any? | |
errors.add(:base, "Similar job is already in queue: #{similar_jobs.to_a}") | |
end | |
end | |
# Soft delete to keep successful jobs | |
# Sample: my_job.destroy ==> soft delete | |
# Sample: my_job.hard_destroy ==> hard delete | |
# Sample: Delayed::Job.include_completed => include completed jobs | |
Delayed::Job.class_eval do | |
default_scope -> { where(completed_at: nil) } | |
scope :include_completed, -> { unscope(where: :completed_at) } | |
alias_method :hard_destroy, :destroy | |
# soft destroy | |
def destroy | |
make_completed! | |
end | |
def make_completed! | |
update_column(:completed_at, Time.current) | |
end | |
end | |
# Reset running jobs when stopping worker: server is down/pod is deleted | |
Delayed::Worker.class_eval do | |
alias_method :stop_original, :stop | |
def stop | |
stop_original | |
unlock_current_jobs | |
end | |
private | |
def unlock_current_jobs | |
worker_jobs = Delayed::Job.where(locked_by: name) | |
worker_jobs.update_all(locked_by: nil, locked_at: nil) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class AddDelayedJobCustomAttributes < ActiveRecord::Migration[6.0] | |
def change | |
add_column :delayed_jobs, :job_uniq_key, :string, index: true | |
add_column :delayed_jobs, :across_uniq_key, :string | |
add_column :delayed_jobs, :completed_at, :datetime | |
add_index :delayed_jobs, %i[locked_at across_uniq_key] | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment