Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
require "digest/md5"
require "resque-retry"
require "resque-loner"
#
# Statused Worker Example:
#
# class MyStatusedWorker < StatusedWorker
# def self.job_identity_arguments(user_id, credit_card_number, attributes = {})
# [user_id]
# end
#
# # RESQUE api change: define perform at instance level
# def perform(user_id, credit_card_number, , attributes)
# sleep(5)
# result = do_job(...)
# set_status(:success => result.success?, :errors => result.errors)
# end
# end
#
# user_id = 1
# Resque.enqueue(MyStatusedWorker, user_id, "4123-5682-3821-1111")
# MyStatusedWorker.in_progress?(user_id) # => true
# MyStatusedWorker.status(user_id) # => nil
# sleep(5)
# MyStatusedWorker.in_progress?(user_id) # => false
# MyStatusedWorker.status(user_id) # => {:success => false, :errors => ["Credit card not valid"]}
#
#
class StatusedWorker
include Resque::Plugins::UniqueJob
extend Resque::Plugins::Retry
attr_accessor :arguments
#
# API
#
def self.in_progress?(*args)
!!(Resque.enqueued?(self, *args) || (self.retriable? && Resque.redis.get(redis_retry_key(*args))))
end
def self.retriable?
self.extended_by.include?(Resque::Plugins::Retry)
end
def self.perform(*args)
delete_status(*args)
self.new(*args).perform(*args)
end
def self.status(*args)
result = Resque.redis.get(self.status_key(*args))
result ? JSON.parse(result).with_indifferent_access : nil
end
def self.status_key(*args)
hex = Digest::MD5.hexdigest({:class => self.to_s, :args => job_identity_arguments(*args)}.to_json)
"resque:statuses:#{hex}"
end
def self.delete_status(*args)
self.redis.del(status_key(*args))
end
def self.inherited(base)
base.class_eval do
@retry_limit = 0 # no retry by default
end
end
def self.job_identity_arguments(*args)
raise NotImplementedError, "#{self.class}#job_identity_arguments suppose to be overwritten"
end
def perfrom(*args)
raise NotImplementedError, "#{self.class}#perfrom(*args) suppose to be overwritten"
end
def job_identity_arguments
self.class.job_identity_arguments(*arguments)
end
def set_status(status)
Rails.logger.debug("Set worker status #{self.class} #{self.job_identity_arguments.inspect} to #{status.inspect}")
Resque.redis.set(self.status_key, status.to_json)
end
#
# Implementation
#
protected
def self.set_status(*args)
raise "You can not use StatusedWorker.perform, use StatusedWorker#perform instead."
end
# key to identify job for resque-retry
def self.identifier(*args)
job_identity_arguments(*args).inspect
end
# key to identify job for resque-loner
def self.redis_key(payload)
payload[:args] = job_identity_arguments(*(payload[:args] || payload["args"]))
super(payload)
end
def initialize(*args)
self.arguments = args
end
def status_key
self.class.status_key(*arguments)
end
def delete_status
self.class.delete_status(*arguments)
end
def self.redis
Resque.redis
end
def redis
self.class.redis
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.