Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
RSpec script attempting (and failing) to reproduce sidekiq-unique-jobs issue #27. https://github.com/mhenrixon/sidekiq-unique-jobs/issues/27
require 'spec_helper'
require 'sidekiq'
require 'sidekiq/middleware/chain'
require 'sidekiq/fetch'
require 'sidekiq/processor'
require 'redis/namespace'
redis_url = 'redis://localhost/15'
client = Redis.connect(:url => redis_url)
REDIS = Redis::Namespace.new('testish', :redis => client)
Sidekiq.configure_client do |config|
config.redis = { :url => redis_url, :namespace => 'testish' }
end
require 'sidekiq/util'
Sidekiq.logger.level = Logger::ERROR
class Sidekiq::Shutdown < Interrupt; end
class TestError < RuntimeError; end
class TestWorkerA
include Sidekiq::Worker
sidekiq_options queue: :first_queue, unique: false
def perform(arg)
unless arg.class == String
puts "TestWorkerA expects a String argument!"
Process.kill "QUIT", Process.pid
end
raise TestError.new 'Dammit Thelma!' if rand < 0.3
end
end
class TestWorkerB
include Sidekiq::Worker
sidekiq_options queue: :second_queue, unique: true
def perform(arg)
unless arg.class == Fixnum
puts "TestWorkerB expects a Fixnum argument!"
Process.kill "QUIT", Process.pid
end
raise TestError.new "Bye, honey. I won't wait up." if rand < 0.3
end
end
describe 'requeueing', :focus do
before :all do
@redis = REDIS
@queues = %w(first_queue second_queue)
end
before :each do
@redis.flushdb
@actor = double('actor')
allow(@actor).to receive(:real_thread)
allow(@actor).to receive(:processor_done)
@boss = double('boss')
allow(@boss).to receive(:async).and_return(@actor)
allow(Sidekiq).to receive(:server_middleware) {
chain = Sidekiq::Processor.default_middleware
require 'sidekiq-unique-jobs/middleware/server/unique_jobs'
chain.add SidekiqUniqueJobs::Middleware::Server::UniqueJobs
chain
}
allow(Sidekiq).to receive(:client_middleware) {
chain = Sidekiq::Middleware::Chain.new
require 'sidekiq-unique-jobs/middleware/client/unique_jobs'
chain.add SidekiqUniqueJobs::Middleware::Client::UniqueJobs
chain
}
end
def poll_retries
retry_set = 'retry'
message = @redis.zrangebyscore(retry_set, '-inf', '+inf').first
if @redis.zrem(retry_set, message)
Sidekiq::Client.push(Sidekiq.load_json(message))
end
end
def fetch_work
Sidekiq::BasicFetch.new({queues: @queues}).retrieve_work
end
it 're-runs the jobs successfully' do
500.times do
TestWorkerA.perform_async(rand(1..100).to_s)
end
500.times do
TestWorkerB.perform_async(rand(1..100))
end
puts "KEYS: #{@redis.keys '*'}"
expect(@boss).to receive(:async)
count = 0
while (Sidekiq::Queue.new('first_queue').size > 0 ||
Sidekiq::Queue.new('second_queue').size > 0 ||
Sidekiq::RetrySet.new.size > 1)
puts "Jobs to retry: #{Sidekiq::RetrySet.new.size}"
while (Sidekiq::RetrySet.new.size > 1)
poll_retries
end
while work = fetch_work
begin
count += 1
Sidekiq::Processor.new(@boss).process(work)
rescue => e
puts "ERROR ==> #{e}"
end
end
end
puts "**** Number of runs: #{count}"
end
end # requeueing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.