Created
July 26, 2012 17:35
-
-
Save bluemont/3183383 to your computer and use it in GitHub Desktop.
Shared behavior for interdependent Resque worker tasks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Derive your workers from BaseWorker, but don't use it directly. | |
class BaseWorker | |
def self.perform(params) | |
document = find_document(params) | |
return if recently_processed?(document, params) | |
pre_process(document, params) or return | |
process_and_mark_as_processed(document, params) or return | |
post_process(document, params) | |
end | |
# assumes Mongoid (Origin) query API | |
def self.find_document(params) | |
class_name = params['class_name'] or raise "class_name required" | |
uid = params['uid'] or raise "uid required" | |
klass = class_name.constantize | |
klass.find_by(uid: uid) | |
end | |
# assumes document has: `field :log, type: Array` | |
def self.recently_processed?(document, params) | |
log = document.log | |
log && log.any? { |k, v| k == worker_tag } | |
end | |
# Implement pre_process in child classes! | |
def self.pre_process(document, params) | |
true | |
end | |
def self.process_and_mark_as_processed(document, params) | |
start_time = Time.now | |
process(document, params) or return | |
mark_as_recently_processed(document, start_time) | |
true | |
end | |
# Implement process in child classes! | |
def self.process(document, params) | |
raise NotImplementedError | |
end | |
def self.mark_as_recently_processed(document, start_time) | |
document.push(:log, [worker_tag, start_time, Time.now]) | |
end | |
# Implement post_process in child classes! | |
def self.post_process(document, params) | |
true | |
end | |
# assumes document has: `field :log, type: Array` | |
# TODO: Consider refactoring by using `recently_processed?` | |
def self.check_prereq(worker_class, document, params) | |
log = document.log | |
return true if log && log.any? { |k, v| k == worker_class.worker_tag } | |
queue_prereq_worker(worker_class, document, params) | |
false | |
end | |
def self.check_prereqs(worker_classes, document, params) | |
worker_classes.shuffle.each do |worker_class| | |
check_prereq(worker_class, document, params) or return | |
end | |
true | |
end | |
def self.queue_prereq_worker(worker_class, document, params) | |
if params['run_pre'] | |
Resque.enqueue(worker_class, params) | |
end | |
true | |
end | |
def self.queue_post_worker(worker_class, document, params) | |
if params['run_post'] | |
Resque.enqueue(worker_class, params) | |
end | |
true | |
end | |
def self.queue_post_workers(worker_classes, document, params) | |
worker_classes.shuffle.each do |worker_class| | |
queue_post_worker(worker_class, document, params) | |
end | |
true | |
end | |
# Implement worker_tag in child classes! | |
def self.worker_tag | |
raise NotImplementedError | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Resque.enqueue(WorkerC, class_name: "BlogPost", uid: "XYZ0123", | |
run_pre: true, run_post: false) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WorkerC < BaseWorker | |
@queue = 'default' | |
def self.worker_tag; :worker_c end | |
def self.pre_process(document, params) | |
check_prereqs([ | |
WorkerA, | |
WorkerB | |
], document, params) | |
end | |
def self.process(document, params) | |
ExampleService.do_something!(document) | |
true | |
end | |
def self.post_process(document, params) | |
queue_post_worker(WorkerD, document, params) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
[WorkerA, WorkerB] --> WorkerC --> WorkerD