Create a gist now

Instantly share code, notes, and snippets.

require "digest/md5"
require "resque-retry"
require "resque-loner"
#
# Statused Worker Example:
#
# class MyStatusedWorker < StatusedWorker
# def self.job_identity_arguments(user_id, credit_card_number, attributes = {})
# [user_id]
# end
#
# # RESQUE api change: define perform at instance level
# def perform(user_id, credit_card_number, , attributes)
# sleep(5)
# result = do_job(...)
# set_status(:success => result.success?, :errors => result.errors)
# end
# end
#
# user_id = 1
# Resque.enqueue(MyStatusedWorker, user_id, "4123-5682-3821-1111")
# MyStatusedWorker.in_progress?(user_id) # => true
# MyStatusedWorker.status(user_id) # => nil
# sleep(5)
# MyStatusedWorker.in_progress?(user_id) # => false
# MyStatusedWorker.status(user_id) # => {:success => false, :errors => ["Credit card not valid"]}
#
#
class StatusedWorker
include Resque::Plugins::UniqueJob
extend Resque::Plugins::Retry
attr_accessor :arguments
#
# API
#
def self.in_progress?(*args)
!!(Resque.enqueued?(self, *args) || (self.retriable? && Resque.redis.get(redis_retry_key(*args))))
end
def self.retriable?
self.extended_by.include?(Resque::Plugins::Retry)
end
def self.perform(*args)
delete_status(*args)
self.new(*args).perform(*args)
end
def self.status(*args)
result = Resque.redis.get(self.status_key(*args))
result ? JSON.parse(result).with_indifferent_access : nil
end
def self.status_key(*args)
hex = Digest::MD5.hexdigest({:class => self.to_s, :args => job_identity_arguments(*args)}.to_json)
"resque:statuses:#{hex}"
end
def self.delete_status(*args)
self.redis.del(status_key(*args))
end
def self.inherited(base)
base.class_eval do
@retry_limit = 0 # no retry by default
end
end
def self.job_identity_arguments(*args)
raise NotImplementedError, "#{self.class}#job_identity_arguments suppose to be overwritten"
end
def perfrom(*args)
raise NotImplementedError, "#{self.class}#perfrom(*args) suppose to be overwritten"
end
def job_identity_arguments
self.class.job_identity_arguments(*arguments)
end
def set_status(status)
Rails.logger.debug("Set worker status #{self.class} #{self.job_identity_arguments.inspect} to #{status.inspect}")
Resque.redis.set(self.status_key, status.to_json)
end
#
# Implementation
#
protected
def self.set_status(*args)
raise "You can not use StatusedWorker.perform, use StatusedWorker#perform instead."
end
# key to identify job for resque-retry
def self.identifier(*args)
job_identity_arguments(*args).inspect
end
# key to identify job for resque-loner
def self.redis_key(payload)
payload[:args] = job_identity_arguments(*(payload[:args] || payload["args"]))
super(payload)
end
def initialize(*args)
self.arguments = args
end
def status_key
self.class.status_key(*arguments)
end
def delete_status
self.class.delete_status(*arguments)
end
def self.redis
Resque.redis
end
def redis
self.class.redis
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment