public
Last active

  • Download Gist
statused_worker.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
require "digest/md5"
require "resque-retry"
require "resque-loner"
 
#
# Statused Worker Example:
#
# class MyStatusedWorker < StatusedWorker
# def self.job_identity_arguments(user_id, credit_card_number, attributes = {})
# [user_id]
# end
#
# # RESQUE api change: define perform at instance level
# def perform(user_id, credit_card_number, , attributes)
# sleep(5)
# result = do_job(...)
# set_status(:success => result.success?, :errors => result.errors)
# end
# end
#
# user_id = 1
# Resque.enqueue(MyStatusedWorker, user_id, "4123-5682-3821-1111")
# MyStatusedWorker.in_progress?(user_id) # => true
# MyStatusedWorker.status(user_id) # => nil
# sleep(5)
# MyStatusedWorker.in_progress?(user_id) # => false
# MyStatusedWorker.status(user_id) # => {:success => false, :errors => ["Credit card not valid"]}
#
#
 
class StatusedWorker
 
include Resque::Plugins::UniqueJob
extend Resque::Plugins::Retry
 
attr_accessor :arguments
 
#
# API
#
def self.in_progress?(*args)
!!(Resque.enqueued?(self, *args) || (self.retriable? && Resque.redis.get(redis_retry_key(*args))))
end
 
 
def self.retriable?
self.extended_by.include?(Resque::Plugins::Retry)
end
 
def self.perform(*args)
delete_status(*args)
self.new(*args).perform(*args)
end
 
def self.status(*args)
result = Resque.redis.get(self.status_key(*args))
result ? JSON.parse(result).with_indifferent_access : nil
end
 
def self.status_key(*args)
hex = Digest::MD5.hexdigest({:class => self.to_s, :args => job_identity_arguments(*args)}.to_json)
"resque:statuses:#{hex}"
end
 
def self.delete_status(*args)
self.redis.del(status_key(*args))
end
 
def self.inherited(base)
base.class_eval do
@retry_limit = 0 # no retry by default
end
end
 
def self.job_identity_arguments(*args)
raise NotImplementedError, "#{self.class}#job_identity_arguments suppose to be overwritten"
end
 
def perfrom(*args)
raise NotImplementedError, "#{self.class}#perfrom(*args) suppose to be overwritten"
end
 
def job_identity_arguments
self.class.job_identity_arguments(*arguments)
end
 
def set_status(status)
Rails.logger.debug("Set worker status #{self.class} #{self.job_identity_arguments.inspect} to #{status.inspect}")
Resque.redis.set(self.status_key, status.to_json)
end
 
#
# Implementation
#
 
 
protected
def self.set_status(*args)
raise "You can not use StatusedWorker.perform, use StatusedWorker#perform instead."
end
 
# key to identify job for resque-retry
def self.identifier(*args)
job_identity_arguments(*args).inspect
end
 
# key to identify job for resque-loner
def self.redis_key(payload)
payload[:args] = job_identity_arguments(*(payload[:args] || payload["args"]))
super(payload)
end
 
 
def initialize(*args)
self.arguments = args
end
 
def status_key
self.class.status_key(*arguments)
end
 
def delete_status
self.class.delete_status(*arguments)
end
 
def self.redis
Resque.redis
end
 
def redis
self.class.redis
end
end

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.