Skip to content

Instantly share code, notes, and snippets.

@maxivak
Last active December 23, 2015 09:57
Show Gist options
  • Save maxivak/28805beccea0bca67516 to your computer and use it in GitHub Desktop.
Save maxivak/28805beccea0bca67516 to your computer and use it in GitHub Desktop.
RabbitMQ examples using Bunny gem

RabbitMQ and Bunny examples

Publish message

Expiration

  • expiration - TTL in milliseconds

# TTL = 5 sec
exchange.publish(msg_data.to_json, :routing_key => msg_key, :expiration => 5*1000)

Subscribe (read messages)

Assuming the queue is defined as:

conn = Bunny.new(:hostname => "rabbit.local", :user => "admin", :password => "pwd", :vhost=>'/')
conn.start

#
ch = conn.create_channel

# exchange
x = ch.topic('mytopic')
q = ch.queue('myq', :auto_delete => false, :exclusive => false, :durable=>false)
q.bind(x, :routing_key => '#')

Read a single message

use Bunny::Queue#pop (basic.get)

delivery_info, properties, msg = q.pop

Read one message and wait for the message

  • Bunny::Queue#pop_waiting - NOT WORK, REMOVED
begin                                                                                                 
  delivery_info, properties, payload = q.pop_waiting(timeout: 10)
rescue Timeout::Error => e
  puts "received timeout"
ensure
  q.delete
  ch.close
  conn.close
end

Cancel consumer

consumer   = q.subscribe(:block => false) do |_, _, payload|
  puts payload
end

puts "Consumer: #{consumer.consumer_tag} created"

sleep 1

# Cancel consumer
cancel_ok = consumer.cancel

Read more:

Get one message and unsubscribe


msg = nil
q.subscribe(block: true) do |delivery_info, metadata, payload|
  msg = payload
  ch.consumers[delivery_info.consumer_tag].cancel
end

unless msg.nil?
   # we got message!
end

Read more:

Timeout

begin
  msg = nil

  timeout 10 do
        begin
          ret = q.subscribe(:block => true) do |delivery_info, metadata, body|
            msg = body

          end
        rescue Interrupt => _
          ch.close
          conn.close
        rescue Error => e
          x=0
        end # / subscribe

        q.unsubscribe if ret == :timed_out
  end # / timeout

rescue Timeout::Error
  msg = nil
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment