public

Demonstrate queue unsubscribe bug

  • Download Gist
Error #1: message delivered on channel after unsubscribe
1 2 3 4 5 6 7 8 9 10 11 12 13
> PUT msg 0
< GET msg 0 [waited 0.0s][511.25 reqs/sec]
> PUT msg 1
> PUT msg 2
< GET msg 1 [waited 0.0s][439.56 reqs/sec]
/Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:535:in `call'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:535:in `receive_frameset'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:514:in `receive_frame'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapters/event_machine.rb:326:in `receive_data'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run_machine'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run'
from demo-amqp-one-message-subscribers.rb:57:in `<main>'
Error #2: producer channel closes
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:261:in `block (2 levels) in send_frameset'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:261:in `each'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:261:in `block in send_frameset'
from <internal:prelude>:10:in `synchronize'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/channel.rb:100:in `synchronize'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/adapter.rb:260:in `send_frameset'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amq-client-0.8.3/lib/amq/client/async/exchange.rb:152:in `publish'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amqp-0.8.0/lib/amqp/exchange.rb:482:in `block in publish'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/em/deferrable.rb:47:in `call'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/em/deferrable.rb:47:in `callback'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amqp-0.8.0/lib/amqp/channel.rb:905:in `once_open'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/amqp-0.8.0/lib/amqp/exchange.rb:479:in `publish'
from demo-amqp-one-message-subscribers.rb:16:in `block in start_producer'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/em/timers.rb:51:in `call'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/em/timers.rb:51:in `fire'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `call'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run_machine'
from /Users/rud/.rvm/gems/ruby-1.9.2-head@bundler/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run'
from demo-amqp-one-message-subscribers.rb:45:in `<main>'
Gemfile
Ruby
1 2 3 4
source 'http://rubygems.org'
 
gem 'amqp'
gem 'eventmachine'
Version info
1 2 3
OS X 10.7.1
ruby 1.9.2p312 (2011-08-11 revision 32926) [x86_64-darwin11.1.0]
RabbitMQ 2.6.1
demo-amqp-one-message-subscribers.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
require 'bundler'
Bundler.setup
 
require 'amqp'
 
puts "single-message consumer listening to rapid producer"
 
QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9
 
def start_producer
exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "")
 
n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end
 
def start_consumer
 
EM::PeriodicTimer.new(CONSUME_RATE) do
 
started = Time.now
AMQP::Channel.new do |channel_consumer|
channel_consumer.prefetch(1)
tick_queue = channel_consumer.queue(QUEUE_NAME)
 
consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|
 
took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"
 
consumer.cancel
channel_consumer.close
end
consumer.consume
end
end
end
 
EM.run do
EM.set_quantum(50)
 
start_producer
start_consumer
end

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.