Skip to content

Instantly share code, notes, and snippets.

@pdlug
Created July 8, 2011 17:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pdlug/1072288 to your computer and use it in GitHub Desktop.
Save pdlug/1072288 to your computer and use it in GitHub Desktop.
Test AMQP consumer/producer with auto recovery
# CONSUMER:
require 'amqp'
def setup_connection(connection)
puts "Setting up connection"
channel = AMQP::Channel.new(connection, :auto_recovery => true)
channel.on_error do |ch, channel_close|
raise channel_close.reply_text
end
exchange = channel.direct('consumers', :durable => true)
queue = channel.queue('consumer', :durable => true)
queue.bind(exchange, :key => 'consumer').subscribe do |headers, payload|
puts "Got '#{payload}' for '#{queue.name}'. Headers are #{headers.to_hash.inspect}"
if headers.reply_to
AMQP::Exchange.default.publish("Processed #{payload}", :key => headers.reply_to)
end
end
end
AMQP.start(
:host => '127.0.0.1',
:vhost => '/',
:timeout => 0.3,
:on_tcp_connection_failure => proc { |settings| puts "Failed to connect"; EM.stop },
:on_possible_authentication_failure => proc { |settings| puts "Failed to authenticate"; EM.stop }
) do |connection, open_ok|
connection.on_error do |ch, connection_close|
#raise connection_close.reply_text
end
connection.on_tcp_connection_loss do |conn, settings|
puts "Lost connection to broker, reconnecting..."
conn.reconnect(false, 1)
end
setup_connection(connection)
trap(:INT) do
unless connection.closing?
connection.close { EM.stop { exit } }
end
end
end
# PRODUCER:
require 'amqp'
require 'simple_uuid'
def setup_connection(connection)
puts "Setting up connection"
channel = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :auto_recovery => true)
exchange = channel.direct('consumers', :passive => true)
exchange.on_return do |basic_return, metadata, payload|
puts "Message returned: #{payload}, reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end
queue_name = SimpleUUID::UUID.new.to_guid
channel.queue(queue_name, :auto_delete => true).subscribe { |msg|
puts "got a reply: #{msg}"
}
n = 0
EM.add_periodic_timer(1) {
unless connection.connected?
puts "No connection to broker!"
else
puts "publishing #{n += 1}"
exchange.publish("Message #{n}", :mandatory => true, :immediate => true, :key => 'consumer', :reply_to => queue_name)
end
}
end
AMQP.start(
:host => '127.0.0.1',
:vhost => '/',
:timeout => 0.3,
:on_tcp_connection_failure => proc { |settings| puts "Failed to connect"; EM.stop },
:on_possible_authentication_failure => proc { |settings| puts "Failed to authenticate"; EM.stop }
) do |connection, open_ok|
connection.on_error do |ch, connection_close|
raise connection_close.reply_text
end
connection.on_tcp_connection_loss do |cl, settings|
puts "Lost connection to broker, reconnecting..."
cl.reconnect(false, 1)
end
setup_connection(connection)
trap(:INT) do
unless connection.closing?
connection.close { EM.stop { exit } }
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment