Skip to content

Instantly share code, notes, and snippets.

@eliaslevy
Created May 18, 2012 00:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliaslevy/2722379 to your computer and use it in GitHub Desktop.
Save eliaslevy/2722379 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'amqp'
require 'logger'
amqp_connection_config = {
:host => "localhost",
:port => 5672,
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => false,
:frame_max => 131072,
:timeout => 2,
:logging => true,
:retry_freq => 2
}
amqp_queue_config = {
:name => 'foo',
:opts => { :durable => true, :nowait => false }
}
# A class that implements a reliable AMQP connection. It will attempt to
# connect to the AMQP broker, and retry until it succeeds. Once connected,
# if the connection is lost for any reason, it will attempt to reconnect
# and recover any existing channels, exchanges, bindings, queues and
# consumers.
class AMQPConnection
# @param [Hash] connection_config The AMQP connection options, per the
# AQMP gem.
#
# In addition to the usual AMQP gem connection options is has these other
# options:
# @option connection_config [Fixnum] :retry_freq How long to wait in seconds
# between attempts when making an initial connection or restarting a
# failed one. Default: 2 seconds.
#
# @param [Logger] log Logger class to use for writting log messages.
# Defaults to logging to STDOUT at DEBUG log level.
#
# @api public
def initialize(connection_config, log=nil)
init_error_handlers = {
:on_tcp_connection_failure => method( :on_tcp_connection_failure ),
:on_possible_authentication_failure => method( :on_possible_authentication_failure )
}
@connection_config = connection_config.merge!(init_error_handlers)
@ready = false
unless log
log = Logger.new(STDOUT)
log.level = Logger::DEBUG
end
@logger = log
connect
end
# @return [Boolean] Whether we AMQP conection is up and available.
# @api public
def ready?
@ready
end
# @return [AMQP::Session] The underlying AMQP connection object.
# @api public
# @see AMQP::Session
def conn
@amqp
end
# Disconnects from the AMQP broker.
#
# @param [Boolean] stop_em Whether to stop the EventMachine reactor. Default: true.
# @api public
def stop(stop_em = true)
@ready = false
# XXX cancel any consumers and wait for them to finish their work
@amqp.disconnect { @logger.info("AMQP Disconnected. Exiting..."); EM.stop if stop_em }
end
private
def retry_freq
@connection_config[:retry_freq] || 2
end
def on_tcp_connection_failure(settings)
@logger.error "Failed to connect to #{@amqp.broker_endpoint}. " +
"Reconnecting in #{retry_freq} seconds."
#EM.add_timer(retry_freq) { connect }
@amqp.reconnect(false,retry_freq)
# Must reregister the on_open method. It appears not to survive if we use reconnect
# on an AMQP::Session that did not connect successfully initially.
@amqp.on_open( &method( :on_open ) )
end
def on_possible_authentication_failure(settings)
@logger.error "Authentication possibly failed to #{@amqp.broker_endpoint} " +
"as #{@amqp.username}. Reconnecting in #{retry_freq} seconds."
#EM.add_timer(retry_freq) { connect }
@amqp.reconnect(false,retry_freq)
# Must reregister the on_open method. It appears not to survive if we use reconnect
# on an AMQP::Session that did not connect successfully initially.
@amqp.on_open( &method( :on_open ) )
end
def on_tcp_connection_loss(connection, settings)
@ready = false
@logger.error "Connection lost to #{@amqp.broker_endpoint}. " +
"Reconnecting in #{retry_freq} seconds."
connection.reconnect(false,retry_freq)
end
def on_connection_interruption(connection)
@ready = false
@logger.error "Connection interrupted to #{@amqp.broker_endpoint}."
#connection.reconnect(false,2)
end
def on_open
@ready = true
@logger.info { "Connected to #{@amqp.broker_endpoint} as #{@amqp.username}" }
end
def on_recovery(connection, settings)
@ready = true
@logger.error "Connection to #{@amqp.broker_endpoint} recovered."
end
def on_error(connection, connection_close)
@logger.error "Received a connection-level exception from #{@amqp.broker_endpoint}. " +
"Class #{connection_close.class_id}, " +
"Method #{connection_close.method_id}, " +
"Status #{connection_close.reply_code}, " +
"Message: #{connection_close.reply_text}"
end
def connect
@amqp = AMQP.connect( @connection_config )
@amqp.on_open( &method( :on_open ) )
@amqp.on_tcp_connection_loss( &method( :on_tcp_connection_loss ) )
@amqp.on_connection_interruption( &method( :on_connection_interruption ) )
@amqp.on_recovery( &method( :on_recovery ) )
@amqp.on_error( &method( :on_error ) )
@amqp.logger = @logger
end
end
# A class that implements a reliable AMQP consumer. It will attempt to
# connect to the AMQP broker, and retry until it succeeds. Once connected,
# if the connection is lost for any reason, it will attempt to reconnect
# and recover any existing channels, exchanges, bindings, queues and
# consumers.
class AMQPConsumer < AMQPConnection
# Subscribes to the specified queue, feeding messages to the block given.
#
# This implementation provides no concurrency. The given block will be
# called serially with a single message at a time. The block will not
# be called with a new message until is finished processing the current
# message.
#
# @note Messages passed to the given block must be explicitely acknowledged or
# rejected.
#
# The channel is set in auto recovery mode. If the connection is lost, once
# reestablished the channel, queue, bindings, and consumers will be
# reestablished automatically.
#
# @param [Hash] queue_def The queue to subscribe to.
# @option queue_def [String] :name The name of the queue.
# @option queue_def [Hash] :opts The queue options, as defined in
# AMQP::Queue#initialize.
# @param [Fixnum] prefetch Number of messages to prefetch. Correct tunning
# can lead to lower latencies.
#
# @yield [headers, payload]
# @yieldparam [AMQP::Header] headers Headers (metadata) associated with this
# message (for example, routing key).
# @yieldparam [String] payload Message body (content).
#
# @api public
# @see AMQP::Queue#initialize
def subscribe(queue_def, prefetch, &callback)
# the prefetch QoS setting is set per channel, not per subscription.
# thus, its best to use a distinct channel per subscription to enable
# independent prefetch values.
@logger.info { "Subscribing to queue #{queue_def[:name]} with opts #{queue_def[:opts].inspect} and prefetch of #{prefetch}."}
subcription_cb = callback
queue = nil
channel = AMQP::Channel.new(@amqp, AMQP::Channel.next_channel_id, :auto_recovery => true, :prefetch => prefetch) do |ch1, open_ok|
@logger.debug { "Channel ##{ch1.id} is now open." }
ch1.on_error do |ch2, close|
@logger.error "Channel level exception on channel ##{ch2.id}. " +
"Class #{close.class_id}, " +
"Method #{close.method_id}, " +
"Status #{close.reply_code}, " +
"Message: #{close.reply_text}"
# Restart subscription
ch2.reuse
@logger.error "Restarting subscription to queue #{queue_def[:name]} " +
"with opts #{queue_def[:opts].inspect} and prefetch of #{prefetch}."
end
begin
queue = ch1.queue(queue_def[:name], queue_def[:opts]) do |queue2, declare_ok|
queue2.subscribe({:ack => true}, &callback)
end
rescue AMQP::Error => e
@logger.error "Got AMQP::Error #{e.to_s}"
# XXX
end
end
end
end
EventMachine.run do
begin
amqp = AMQPConsumer.new amqp_connection_config
EM::add_timer(2) do
amqp.subscribe(amqp_queue_config, 10) do |metadata, msg|
puts "Got message"
metadata.ack
metadata.ack
end
end
rescue Exception => e
puts e
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment