Skip to content

Instantly share code, notes, and snippets.

@bluemont
Created July 26, 2012 17:35
Show Gist options
  • Save bluemont/3183383 to your computer and use it in GitHub Desktop.
Save bluemont/3183383 to your computer and use it in GitHub Desktop.
Shared behavior for interdependent Resque worker tasks
# Derive your workers from BaseWorker, but don't use it directly.
class BaseWorker
def self.perform(params)
document = find_document(params)
return if recently_processed?(document, params)
pre_process(document, params) or return
process_and_mark_as_processed(document, params) or return
post_process(document, params)
end
# assumes Mongoid (Origin) query API
def self.find_document(params)
class_name = params['class_name'] or raise "class_name required"
uid = params['uid'] or raise "uid required"
klass = class_name.constantize
klass.find_by(uid: uid)
end
# assumes document has: `field :log, type: Array`
def self.recently_processed?(document, params)
log = document.log
log && log.any? { |k, v| k == worker_tag }
end
# Implement pre_process in child classes!
def self.pre_process(document, params)
true
end
def self.process_and_mark_as_processed(document, params)
start_time = Time.now
process(document, params) or return
mark_as_recently_processed(document, start_time)
true
end
# Implement process in child classes!
def self.process(document, params)
raise NotImplementedError
end
def self.mark_as_recently_processed(document, start_time)
document.push(:log, [worker_tag, start_time, Time.now])
end
# Implement post_process in child classes!
def self.post_process(document, params)
true
end
# assumes document has: `field :log, type: Array`
# TODO: Consider refactoring by using `recently_processed?`
def self.check_prereq(worker_class, document, params)
log = document.log
return true if log && log.any? { |k, v| k == worker_class.worker_tag }
queue_prereq_worker(worker_class, document, params)
false
end
def self.check_prereqs(worker_classes, document, params)
worker_classes.shuffle.each do |worker_class|
check_prereq(worker_class, document, params) or return
end
true
end
def self.queue_prereq_worker(worker_class, document, params)
if params['run_pre']
Resque.enqueue(worker_class, params)
end
true
end
def self.queue_post_worker(worker_class, document, params)
if params['run_post']
Resque.enqueue(worker_class, params)
end
true
end
def self.queue_post_workers(worker_classes, document, params)
worker_classes.shuffle.each do |worker_class|
queue_post_worker(worker_class, document, params)
end
true
end
# Implement worker_tag in child classes!
def self.worker_tag
raise NotImplementedError
end
end
Resque.enqueue(WorkerC, class_name: "BlogPost", uid: "XYZ0123",
run_pre: true, run_post: false)
class WorkerC < BaseWorker
@queue = 'default'
def self.worker_tag; :worker_c end
def self.pre_process(document, params)
check_prereqs([
WorkerA,
WorkerB
], document, params)
end
def self.process(document, params)
ExampleService.do_something!(document)
true
end
def self.post_process(document, params)
queue_post_worker(WorkerD, document, params)
end
end
@bluemont
Copy link
Author

[WorkerA, WorkerB] --> WorkerC --> WorkerD

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment