Created

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

RSpec script attempting (and failing) to reproduce sidekiq-unique-jobs issue #27. https://github.com/mhenrixon/sidekiq-unique-jobs/issues/27

View sidekiq_unique_jobs_bug_spec.rb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
require 'spec_helper'
require 'sidekiq'
require 'sidekiq/middleware/chain'
require 'sidekiq/fetch'
require 'sidekiq/processor'
 
 
require 'redis/namespace'
redis_url = 'redis://localhost/15'
client = Redis.connect(:url => redis_url)
REDIS = Redis::Namespace.new('testish', :redis => client)
 
Sidekiq.configure_client do |config|
config.redis = { :url => redis_url, :namespace => 'testish' }
end
 
require 'sidekiq/util'
Sidekiq.logger.level = Logger::ERROR
 
class Sidekiq::Shutdown < Interrupt; end
 
class TestError < RuntimeError; end
 
class TestWorkerA
include Sidekiq::Worker
 
sidekiq_options queue: :first_queue, unique: false
 
def perform(arg)
unless arg.class == String
puts "TestWorkerA expects a String argument!"
Process.kill "QUIT", Process.pid
end
raise TestError.new 'Dammit Thelma!' if rand < 0.3
end
end
 
class TestWorkerB
include Sidekiq::Worker
 
sidekiq_options queue: :second_queue, unique: true
 
def perform(arg)
unless arg.class == Fixnum
puts "TestWorkerB expects a Fixnum argument!"
Process.kill "QUIT", Process.pid
end
raise TestError.new "Bye, honey. I won't wait up." if rand < 0.3
end
end
 
describe 'requeueing', :focus do
 
before :all do
@redis = REDIS
@queues = %w(first_queue second_queue)
end
 
before :each do
@redis.flushdb
 
@actor = double('actor')
allow(@actor).to receive(:real_thread)
allow(@actor).to receive(:processor_done)
 
@boss = double('boss')
allow(@boss).to receive(:async).and_return(@actor)
 
allow(Sidekiq).to receive(:server_middleware) {
chain = Sidekiq::Processor.default_middleware
require 'sidekiq-unique-jobs/middleware/server/unique_jobs'
chain.add SidekiqUniqueJobs::Middleware::Server::UniqueJobs
chain
}
allow(Sidekiq).to receive(:client_middleware) {
chain = Sidekiq::Middleware::Chain.new
require 'sidekiq-unique-jobs/middleware/client/unique_jobs'
chain.add SidekiqUniqueJobs::Middleware::Client::UniqueJobs
chain
}
end
 
def poll_retries
retry_set = 'retry'
message = @redis.zrangebyscore(retry_set, '-inf', '+inf').first
if @redis.zrem(retry_set, message)
Sidekiq::Client.push(Sidekiq.load_json(message))
end
end
def fetch_work
Sidekiq::BasicFetch.new({queues: @queues}).retrieve_work
end
 
it 're-runs the jobs successfully' do
500.times do
TestWorkerA.perform_async(rand(1..100).to_s)
end
500.times do
TestWorkerB.perform_async(rand(1..100))
end
puts "KEYS: #{@redis.keys '*'}"
 
expect(@boss).to receive(:async)
 
count = 0
while (Sidekiq::Queue.new('first_queue').size > 0 ||
Sidekiq::Queue.new('second_queue').size > 0 ||
Sidekiq::RetrySet.new.size > 1)
puts "Jobs to retry: #{Sidekiq::RetrySet.new.size}"
while (Sidekiq::RetrySet.new.size > 1)
poll_retries
end
while work = fetch_work
begin
count += 1
Sidekiq::Processor.new(@boss).process(work)
rescue => e
puts "ERROR ==> #{e}"
end
end
end
puts "**** Number of runs: #{count}"
end
end # requeueing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.