Created
May 18, 2012 00:12
-
-
Save eliaslevy/2722379 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'amqp' | |
require 'logger' | |
amqp_connection_config = { | |
:host => "localhost", | |
:port => 5672, | |
:user => "guest", | |
:pass => "guest", | |
:vhost => "/", | |
:ssl => false, | |
:frame_max => 131072, | |
:timeout => 2, | |
:logging => true, | |
:retry_freq => 2 | |
} | |
amqp_queue_config = { | |
:name => 'foo', | |
:opts => { :durable => true, :nowait => false } | |
} | |
# A class that implements a reliable AMQP connection. It will attempt to | |
# connect to the AMQP broker, and retry until it succeeds. Once connected, | |
# if the connection is lost for any reason, it will attempt to reconnect | |
# and recover any existing channels, exchanges, bindings, queues and | |
# consumers. | |
class AMQPConnection | |
# @param [Hash] connection_config The AMQP connection options, per the | |
# AQMP gem. | |
# | |
# In addition to the usual AMQP gem connection options is has these other | |
# options: | |
# @option connection_config [Fixnum] :retry_freq How long to wait in seconds | |
# between attempts when making an initial connection or restarting a | |
# failed one. Default: 2 seconds. | |
# | |
# @param [Logger] log Logger class to use for writting log messages. | |
# Defaults to logging to STDOUT at DEBUG log level. | |
# | |
# @api public | |
def initialize(connection_config, log=nil) | |
init_error_handlers = { | |
:on_tcp_connection_failure => method( :on_tcp_connection_failure ), | |
:on_possible_authentication_failure => method( :on_possible_authentication_failure ) | |
} | |
@connection_config = connection_config.merge!(init_error_handlers) | |
@ready = false | |
unless log | |
log = Logger.new(STDOUT) | |
log.level = Logger::DEBUG | |
end | |
@logger = log | |
connect | |
end | |
# @return [Boolean] Whether we AMQP conection is up and available. | |
# @api public | |
def ready? | |
@ready | |
end | |
# @return [AMQP::Session] The underlying AMQP connection object. | |
# @api public | |
# @see AMQP::Session | |
def conn | |
@amqp | |
end | |
# Disconnects from the AMQP broker. | |
# | |
# @param [Boolean] stop_em Whether to stop the EventMachine reactor. Default: true. | |
# @api public | |
def stop(stop_em = true) | |
@ready = false | |
# XXX cancel any consumers and wait for them to finish their work | |
@amqp.disconnect { @logger.info("AMQP Disconnected. Exiting..."); EM.stop if stop_em } | |
end | |
private | |
def retry_freq | |
@connection_config[:retry_freq] || 2 | |
end | |
def on_tcp_connection_failure(settings) | |
@logger.error "Failed to connect to #{@amqp.broker_endpoint}. " + | |
"Reconnecting in #{retry_freq} seconds." | |
#EM.add_timer(retry_freq) { connect } | |
@amqp.reconnect(false,retry_freq) | |
# Must reregister the on_open method. It appears not to survive if we use reconnect | |
# on an AMQP::Session that did not connect successfully initially. | |
@amqp.on_open( &method( :on_open ) ) | |
end | |
def on_possible_authentication_failure(settings) | |
@logger.error "Authentication possibly failed to #{@amqp.broker_endpoint} " + | |
"as #{@amqp.username}. Reconnecting in #{retry_freq} seconds." | |
#EM.add_timer(retry_freq) { connect } | |
@amqp.reconnect(false,retry_freq) | |
# Must reregister the on_open method. It appears not to survive if we use reconnect | |
# on an AMQP::Session that did not connect successfully initially. | |
@amqp.on_open( &method( :on_open ) ) | |
end | |
def on_tcp_connection_loss(connection, settings) | |
@ready = false | |
@logger.error "Connection lost to #{@amqp.broker_endpoint}. " + | |
"Reconnecting in #{retry_freq} seconds." | |
connection.reconnect(false,retry_freq) | |
end | |
def on_connection_interruption(connection) | |
@ready = false | |
@logger.error "Connection interrupted to #{@amqp.broker_endpoint}." | |
#connection.reconnect(false,2) | |
end | |
def on_open | |
@ready = true | |
@logger.info { "Connected to #{@amqp.broker_endpoint} as #{@amqp.username}" } | |
end | |
def on_recovery(connection, settings) | |
@ready = true | |
@logger.error "Connection to #{@amqp.broker_endpoint} recovered." | |
end | |
def on_error(connection, connection_close) | |
@logger.error "Received a connection-level exception from #{@amqp.broker_endpoint}. " + | |
"Class #{connection_close.class_id}, " + | |
"Method #{connection_close.method_id}, " + | |
"Status #{connection_close.reply_code}, " + | |
"Message: #{connection_close.reply_text}" | |
end | |
def connect | |
@amqp = AMQP.connect( @connection_config ) | |
@amqp.on_open( &method( :on_open ) ) | |
@amqp.on_tcp_connection_loss( &method( :on_tcp_connection_loss ) ) | |
@amqp.on_connection_interruption( &method( :on_connection_interruption ) ) | |
@amqp.on_recovery( &method( :on_recovery ) ) | |
@amqp.on_error( &method( :on_error ) ) | |
@amqp.logger = @logger | |
end | |
end | |
# A class that implements a reliable AMQP consumer. It will attempt to | |
# connect to the AMQP broker, and retry until it succeeds. Once connected, | |
# if the connection is lost for any reason, it will attempt to reconnect | |
# and recover any existing channels, exchanges, bindings, queues and | |
# consumers. | |
class AMQPConsumer < AMQPConnection | |
# Subscribes to the specified queue, feeding messages to the block given. | |
# | |
# This implementation provides no concurrency. The given block will be | |
# called serially with a single message at a time. The block will not | |
# be called with a new message until is finished processing the current | |
# message. | |
# | |
# @note Messages passed to the given block must be explicitely acknowledged or | |
# rejected. | |
# | |
# The channel is set in auto recovery mode. If the connection is lost, once | |
# reestablished the channel, queue, bindings, and consumers will be | |
# reestablished automatically. | |
# | |
# @param [Hash] queue_def The queue to subscribe to. | |
# @option queue_def [String] :name The name of the queue. | |
# @option queue_def [Hash] :opts The queue options, as defined in | |
# AMQP::Queue#initialize. | |
# @param [Fixnum] prefetch Number of messages to prefetch. Correct tunning | |
# can lead to lower latencies. | |
# | |
# @yield [headers, payload] | |
# @yieldparam [AMQP::Header] headers Headers (metadata) associated with this | |
# message (for example, routing key). | |
# @yieldparam [String] payload Message body (content). | |
# | |
# @api public | |
# @see AMQP::Queue#initialize | |
def subscribe(queue_def, prefetch, &callback) | |
# the prefetch QoS setting is set per channel, not per subscription. | |
# thus, its best to use a distinct channel per subscription to enable | |
# independent prefetch values. | |
@logger.info { "Subscribing to queue #{queue_def[:name]} with opts #{queue_def[:opts].inspect} and prefetch of #{prefetch}."} | |
subcription_cb = callback | |
queue = nil | |
channel = AMQP::Channel.new(@amqp, AMQP::Channel.next_channel_id, :auto_recovery => true, :prefetch => prefetch) do |ch1, open_ok| | |
@logger.debug { "Channel ##{ch1.id} is now open." } | |
ch1.on_error do |ch2, close| | |
@logger.error "Channel level exception on channel ##{ch2.id}. " + | |
"Class #{close.class_id}, " + | |
"Method #{close.method_id}, " + | |
"Status #{close.reply_code}, " + | |
"Message: #{close.reply_text}" | |
# Restart subscription | |
ch2.reuse | |
@logger.error "Restarting subscription to queue #{queue_def[:name]} " + | |
"with opts #{queue_def[:opts].inspect} and prefetch of #{prefetch}." | |
end | |
begin | |
queue = ch1.queue(queue_def[:name], queue_def[:opts]) do |queue2, declare_ok| | |
queue2.subscribe({:ack => true}, &callback) | |
end | |
rescue AMQP::Error => e | |
@logger.error "Got AMQP::Error #{e.to_s}" | |
# XXX | |
end | |
end | |
end | |
end | |
EventMachine.run do | |
begin | |
amqp = AMQPConsumer.new amqp_connection_config | |
EM::add_timer(2) do | |
amqp.subscribe(amqp_queue_config, 10) do |metadata, msg| | |
puts "Got message" | |
metadata.ack | |
metadata.ack | |
end | |
end | |
rescue Exception => e | |
puts e | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment