Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Some Resque helpers
# There are Minitest tests for reliably_enqueue below the module
# Note that this isn't well tested because it was used briefly for maintenance and we ended up killing Resque
# I apologize in advance for any off-by-one errors
module Resque
SLEEP_BETWEEN_RETRY = 0.1.freeze
RETRY = 3
NUM_PROCS_FOR_FAST_ENQUEUE = 10
class << self
def reliably_enqueue(klass, *payload)
retries = RETRY
begin
Sidewalk.log "Enqueuing #{klass}: #{payload.inspect}"
return Resque.enqueue(klass, *payload)
rescue Redis::CannotConnectError, Redis::TimeoutError => ex
retries -= 1
(retries > 0) || raise
sleep SLEEP_BETWEEN_RETRY
retry
end
end
# Note: Spawning multiple threads did not speed this up at all ...maybe a ruby Resque client library shortcoming?
# For now, forks
def fast_enqueue(klass, payloads)
if payloads.length < NUM_PROCS_FOR_FAST_ENQUEUE
payloads.each { |payload| Resque.reliably_enqueue(klass, *payload) }
else
approx_slice_size = payloads.length / NUM_PROCS_FOR_FAST_ENQUEUE
batches = payloads.each_slice(approx_slice_size).to_a
pids = []
batches.each do |batch|
config = ActiveRecord::Base.remove_connection
pids << fork do
REDIS.client.reconnect
ActiveRecord::Base.establish_connection(config)
puts "Created process for #{batch.length} payloads"
batch.each do |payload|
Resque.reliably_enqueue(klass, *payload)
end
ActiveRecord::Base.remove_connection
end
ActiveRecord::Base.establish_connection(config)
end
pids.each do |pid|
Process.waitpid2(pid)
end
end
end
def requeue_and_clear_all_failures
(Resque::Failure.count-1).downto(0).each { |i| Resque::Failure.requeue(i) }
Resque::Failure.clear
end
def clear_jobs_of_class(klass)
i=0
while job = Resque::Failure.all(i)
if job['payload']['class']== klass.to_s
Resque::Failure.remove(i)
else
i += 1
end
end
end
def remove_jobs_with_exception_string(s)
i=0
while job = Resque::Failure.all(i)
if job['exception'] == s
Resque::Failure.remove(i)
else
i += 1
end
end
end
def requeue_and_clear_n_jobs(n)
i = 0
while job = Resque::Failure.all(0) and i < n
puts "Requeuing #{job.inspect}"
Resque::Failure.requeue(i)
Resque::Failure.remove(i)
i += 1
end
end
def requeue_and_clear_n_jobs_of_class(n, klass)
i=0
cleared=0
while job = Resque::Failure.all(i) and cleared < n
if job['payload']['class'] == klass.to_s
puts "Requeuing #{job.inspect}"
Resque::Failure.requeue(i)
Resque::Failure.remove(i)
cleared += 1
else
puts "Skipping #{job['payload']['class']}"
i += 1
end
end
puts "Requeued #{cleared} jobs of class #{klass}"
end
end
end
# Minitest tests for reliably_enqueue
describe Resque do
class FakeJob; end
class FakeException < StandardError; end
describe "reliably_enqueue" do
it "should fail after exception is raised max times" do
Resque.stubs(:enqueue).raises(Redis::CannotConnectError.new)
->{ Resque.reliably_enqueue FakeJob, 1 }.must_raise Redis::CannotConnectError
end
it "should try 3 times" do
Resque.stubs(:enqueue)
.raises(Redis::CannotConnectError)
.then.raises(Redis::CannotConnectError)
.then.raises(Redis::CannotConnectError)
.then.raises(FakeException.new)
->{ Resque.reliably_enqueue FakeJob, 1 }.must_raise Redis::CannotConnectError
end
it "should fail if uncaught exception is raised" do
Resque.stubs(:enqueue).raises(FakeException.new)
->{ Resque.reliably_enqueue FakeJob, 1 }.must_raise FakeException
end
it "should execute once if a job is successfully enqueued" do
@count = 0
Resque.stubs(:enqueue).returns(@count+=1)
Resque.reliably_enqueue FakeJob, 1
@count.must_be :==, 1
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.