Created
September 16, 2011 13:50
-
-
Save bogdan/1222167 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "digest/md5" | |
require "resque-retry" | |
require "resque-loner" | |
# | |
# Statused Worker Example: | |
# | |
# class MyStatusedWorker < StatusedWorker | |
# def self.job_identity_arguments(user_id, credit_card_number, attributes = {}) | |
# [user_id] | |
# end | |
# | |
# # RESQUE api change: define perform at instance level | |
# def perform(user_id, credit_card_number, , attributes) | |
# sleep(5) | |
# result = do_job(...) | |
# set_status(:success => result.success?, :errors => result.errors) | |
# end | |
# end | |
# | |
# user_id = 1 | |
# Resque.enqueue(MyStatusedWorker, user_id, "4123-5682-3821-1111") | |
# MyStatusedWorker.in_progress?(user_id) # => true | |
# MyStatusedWorker.status(user_id) # => nil | |
# sleep(5) | |
# MyStatusedWorker.in_progress?(user_id) # => false | |
# MyStatusedWorker.status(user_id) # => {:success => false, :errors => ["Credit card not valid"]} | |
# | |
# | |
class StatusedWorker | |
include Resque::Plugins::UniqueJob | |
extend Resque::Plugins::Retry | |
attr_accessor :arguments | |
# | |
# API | |
# | |
def self.in_progress?(*args) | |
!!(Resque.enqueued?(self, *args) || (self.retriable? && Resque.redis.get(redis_retry_key(*args)))) | |
end | |
def self.retriable? | |
self.extended_by.include?(Resque::Plugins::Retry) | |
end | |
def self.perform(*args) | |
delete_status(*args) | |
self.new(*args).perform(*args) | |
end | |
def self.status(*args) | |
result = Resque.redis.get(self.status_key(*args)) | |
result ? JSON.parse(result).with_indifferent_access : nil | |
end | |
def self.status_key(*args) | |
hex = Digest::MD5.hexdigest({:class => self.to_s, :args => job_identity_arguments(*args)}.to_json) | |
"resque:statuses:#{hex}" | |
end | |
def self.delete_status(*args) | |
self.redis.del(status_key(*args)) | |
end | |
def self.inherited(base) | |
base.class_eval do | |
@retry_limit = 0 # no retry by default | |
end | |
end | |
def self.job_identity_arguments(*args) | |
raise NotImplementedError, "#{self.class}#job_identity_arguments suppose to be overwritten" | |
end | |
def perfrom(*args) | |
raise NotImplementedError, "#{self.class}#perfrom(*args) suppose to be overwritten" | |
end | |
def job_identity_arguments | |
self.class.job_identity_arguments(*arguments) | |
end | |
def set_status(status) | |
Rails.logger.debug("Set worker status #{self.class} #{self.job_identity_arguments.inspect} to #{status.inspect}") | |
Resque.redis.set(self.status_key, status.to_json) | |
end | |
# | |
# Implementation | |
# | |
protected | |
def self.set_status(*args) | |
raise "You can not use StatusedWorker.perform, use StatusedWorker#perform instead." | |
end | |
# key to identify job for resque-retry | |
def self.identifier(*args) | |
job_identity_arguments(*args).inspect | |
end | |
# key to identify job for resque-loner | |
def self.redis_key(payload) | |
payload[:args] = job_identity_arguments(*(payload[:args] || payload["args"])) | |
super(payload) | |
end | |
def initialize(*args) | |
self.arguments = args | |
end | |
def status_key | |
self.class.status_key(*arguments) | |
end | |
def delete_status | |
self.class.delete_status(*arguments) | |
end | |
def self.redis | |
Resque.redis | |
end | |
def redis | |
self.class.redis | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment