Last active
August 29, 2015 14:02
-
-
Save notruthless/2c66bea57b8db39d6198 to your computer and use it in GitHub Desktop.
Recurring jobs in DelayedJob 3 and Rails 4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RecurringJob < Struct.new(:options ) | |
# http://engineering.onlive.com/2014/06/06/scheduling-procrastination-with-delayedjob/ | |
# (code inspired by https://gist.github.com/JoshMcKin/1648242) | |
def self.schedule_job(options = {}, this_job=nil) | |
# schedule this job (if you just want job to run once, just use queue_once ) | |
# this_job is currently running instance (if any)(so we can check against it) | |
# options - | |
# :interval => num_seconds - how often to schedule the job | |
# default, once a day | |
# :queue => name of queue to use (only one job will be scheduled at a time for any given queue) | |
# default: the name of this class | |
# :start_time => specify a specific time for this run, then use interval after that | |
# Plus any other options (if any) you want sent through to the underlying job. | |
options ||= {} # in case sent in explicitly as nil | |
options[:interval] ||= default_interval | |
options[:queue] ||= default_queue | |
queue_name = options[:queue] | |
other_job = next_scheduled_job(this_job, queue_name) | |
if other_job | |
Rails.logger.info "#{queue_name} job is already scheduled for #{other_job.run_at}." | |
else | |
# if start time is specified, use it ONLY this time (to start), don't pass on in options | |
run_time = options.delete(:start_time) | |
run_time ||= Time.now + options[:interval].to_i # make sure it's an integer (e.g. if sent in as 1.day) | |
other_job = Delayed::Job.enqueue self.new(options), :run_at => run_time, :queue=> queue_name | |
Rails.logger.info "A new #{queue_name} job has been scheduled for #{other_job.run_at}." | |
end | |
other_job | |
end | |
def self.queue_once(options = {}) | |
# just run this - add the queue to run one time only (not scheduled) | |
# IMPORTANT: don't put in same queue name as recurring job and DON'T specify an interval in the options! | |
Delayed::Job.enqueue self.new(options) | |
end | |
def self.default_interval | |
1.day.to_i | |
end | |
def self.default_queue | |
self.name | |
end | |
def self.next_scheduled_job(this_job=nil, queue_name = nil) | |
# return job if it exists | |
queue_name ||= default_queue | |
conditions = ['queue = ? AND failed_at IS NULL', queue_name] | |
unless this_job.blank? | |
# don't include this job in the search | |
conditions[0] << " AND id != ?" | |
conditions << this_job.id | |
end | |
Delayed::Job.where(conditions).first | |
end | |
def self.job_interval(job) | |
# given a job from the queue | |
# parse the handler yaml and give back the current interval | |
# nil means no interval set | |
y = YAML.load(job.handler) | |
y.options && y.options[:interval] | |
end | |
def self.set_job_interval(job, interval) | |
# given a job from the queue | |
# parse the handler yaml and set the job interval | |
interval ||= default_interval | |
y = YAML.load(job.handler) | |
y.options ||= {} | |
y.options[:interval] = interval | |
job.handler = y.to_yaml | |
job.save! | |
end | |
def perform | |
Rails.logger.debug("PERFORMING #{self.class.name} - #{options}") | |
# should be overridden by real job. | |
end | |
def error(job, exception) | |
Rails.logger.error("ERROR - attempts: #{job.attempts} (#{exception})") | |
end | |
def before(job) | |
# If an interval was specified, make sure there's a future job using the same options as before. | |
# Otherwise, this is a one time run so don't reschedule. | |
if options && options[:interval] | |
self.class.schedule_job(options, job) | |
else | |
Rails.logger.info("Not scheduling this #{self.class.name} to run again.") | |
end | |
end | |
end | |
class AppSyncJob < RecurringJob | |
# As a recurring job, we just provide the "perform" method to do the actual work | |
# We can add send in our own options (in this example an app_id for a database model named App) | |
# that will be passed on each time when the job is scheduled | |
def perform | |
if options | |
app_id = options[:app_id] | |
end | |
apps_to_process = app_id ? App.where(id:app_id) : App.all | |
apps_to_process.each do |app| | |
app.do_whatever_it_means_to_sync | |
end | |
end | |
end | |
# we can run this job a single time to check a single app | |
AppSyncJob.queue_once(app_id:App.first.id) | |
# or we can set it up to run as a scheduled job | |
AppSyncJob.schedule_job(interval:12.hours) | |
# or we can set it up to run for each particular app on a schedule, using the (unique) | |
# name of the app as the name of the queue | |
App.all.each do |app| | |
AppSyncJob.schedule_job(interval:12.hours, app_id:app.id, queue:app.name) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment