Skip to content

Instantly share code, notes, and snippets.

@mrjabba
Created July 27, 2012 21:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrjabba/3190642 to your computer and use it in GitHub Desktop.
Save mrjabba/3190642 to your computer and use it in GitHub Desktop.
simplest possible thing..works..pub and sub together
def do_it
#simplest possible thing..works..pub and sub together..
puts "do it.."
EventMachine.run do
connection = AMQP.connect('amqp://guest:guest@localhost:5672')
puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
opts = {:durable => true, :ack => true}
ex_name = "my.exchange.name"
q_name = "myqueue"
puts "bleh 1"
channel = AMQP::Channel.new(connection)
channel.auto_recovery = true
if channel.auto_recovering?
puts "Channel #{channel.id} IS auto-recovering"
end
connection.on_tcp_connection_loss do |conn, settings|
puts "[network failure] Trying to reconnect..."
conn.reconnect(false, 2)
end
queue = channel.queue(q_name, opts)
exchange = channel.fanout(ex_name, opts)
puts "bleh 2"
queue.bind(exchange)
channel.on_error do |ch, channel_close|
puts "bleh closing..."
puts channel_close.reply_text
connection.close { EventMachine.stop }
end
queue.status do |number_of_messages, number_of_consumers|
puts
puts "before: # of messages in the queue #{queue.name} = #{number_of_messages}, number_of_consumers = #{number_of_consumers}"
puts
end
queue.subscribe(:ack => true) do |metadata, payload|
puts " ** Received a message: #{payload} **\n\n"
puts "now acknowledge it.."
channel.acknowledge(metadata.delivery_tag, false)
end
EventMachine.add_periodic_timer(2.0) do
message = "pub a message.. #{Time.now}"
puts "publishing this: #{message}"
exchange.publish(message)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment