Skip to content

Instantly share code, notes, and snippets.

@celldee
Created September 11, 2009 07:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save celldee/185144 to your computer and use it in GitHub Desktop.
Save celldee/185144 to your computer and use it in GitHub Desktop.
Bunny Subscription Examples (versions <= 0.8.0)
# This is how Queue#subscribe will work in the next version of Bunny (v0.5.4).
# It's in the 'experimental' branch now.
#
# In order for subscription to work smoothly you need to use a combination of
# Queue#subscribe, Queue#unsubscribe, Queue#ack and Client#qos prefetch. The
# prefetch will allow a controlled number of messages to be released to a
# consumer, which means that Queue#unsubscribe and other methods can be run
# without errors due to out of sequence messages.
#
# Example 1 - Using :message_max where no. of messages > message_max
#
# Start Bunny instance
b = Bunny.new(:logging => true)
b.start
# Create queue
q = b.queue('subtest1')
# Make sure queue is empty
q.purge
# Set prefetch window (no. of messages that server will send at a time).
# Default is 1. RabbitMQ v1.6.0 only accepts :prefetch_count option. The
# other options have not been implemented yet.
b.qos()
# Publish some messages
10.times {q.publish('Here be another test message')}
# Subscribe to the queue. Must specify :ack => true otherwise
# prefetch will not work.
q.subscribe(:message_max => 5, :ack => true) {|msg| puts msg[:payload]}
# Check the queue status
s = q.status
puts "Queue: #{q.name}\nMessages: #{s[:message_count]}\nConsumers: #{s[:consumer_count]}"
#
# Example 2 - Using :timeout
#
# Copy set up stuff from Example 1. Prefetch is not needed.
# Subscribe to the queue
q.subscribe(:timeout => 10) {|msg| puts msg[:payload]}
# Check the queue status
s = q.status
puts "Queue: #{q.name}\nMessages: #{s[:message_count]}\nConsumers: #{s[:consumer_count]}"
#
# Example 3 - Only process the first message from the queue. N.B. More easily
# done with :message_max option
#
# Copy set up stuff from Example 1. Prefetch is needed.
# Subscribe to the queue. Prefetch is needed
q.subscribe(:consumer_tag => 'my_consumer', :ack => true) do |msg|
# Get first message
puts msg[:payload]
# Unsubscribe from queue
q.unsubscribe()
# Acknowledge the message
q.ack()
# Break out of the loop
break
end
# Check the queue status
s = q.status
puts "Queue: #{q.name}\nMessages: #{s[:message_count]}\nConsumers: #{s[:consumer_count]}"
#
# Example 4 - Process all messages from queue then stop when queue empty
#
b = Bunny.new
b.start()
b.qos()
q = b.queue()
10.times { q.publish('Hey Ho') }
cnt = 0
q.subscribe(:ack => true) do |msg|
cnt += 1
msg_cnt = q.message_count()
puts '************************'
puts "Message: #{cnt} - #{msg[:payload]}"
puts "#{msg_cnt} message(s) left in the queue"
puts '************************'
if msg_cnt == 0
q.unsubscribe()
q.ack()
break
end
end
#
# Example 5 - Display all of the information about a consumed message
#
b = Bunny.new()
b.start()
b.qos()
q = b.queue()
q.publish('Another message served up by RabbitMQ')
q.subscribe(:message_max => 1, :ack => true) do |msg|
puts
puts "Message Information"
puts '-------------------'
puts "Payload - #{msg[:payload]}"
puts
puts "Header - "
msg[:header].instance_variables.each do |var|
vname = var.to_s.sub(/@/, '')
val = eval("msg[:header]." + vname)
if val.is_a?(Hash)
puts " #{vname.capitalize} -"
val.each_pair do |k,v|
puts " #{k.capitalize}: #{v}"
end
else
puts " #{vname.capitalize}: #{val.to_s}"
end
end
puts
puts "Delivery details -"
msg[:delivery_details].each_pair do |k,v|
puts " #{k.capitalize}: #{v}"
end
puts
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment