Created
May 16, 2012 16:58
-
-
Save eliaslevy/2712199 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'amqp' | |
amqp_connection_config = { | |
:host => "localhost", | |
:port => 5672, | |
:user => "guest", | |
:pass => "guest", | |
:vhost => "/", | |
:ssl => false, | |
:frame_max => 131072, | |
:timeout => 2, | |
:logging => true, | |
:retry_freq => 2 | |
} | |
class AMQPConnection | |
def initialize(connection_config) | |
init_error_handlers = { | |
:on_tcp_connection_failure => method( :on_tcp_connection_failure ), | |
:on_possible_authentication_failure => method( :on_possible_authentication_failure ) | |
} | |
@connection_config = connection_config.dup | |
@connection_config.merge!(init_error_handlers) | |
@ready = false | |
connect | |
end | |
def ready? | |
@ready | |
end | |
def retry_freq | |
@connection_config[:retry_freq] || 2 | |
end | |
def on_tcp_connection_failure(settings) | |
puts "Failed to connect."; | |
#EM.add_timer(retry_freq) { connect } | |
@amqp.reconnect(false,retry_freq) | |
# Must reregister the on_open method. It appears not to survive if we use reconnect | |
# on an AMQP::Session that did not connect successfully initially. | |
@amqp.on_open &method( :on_open ) | |
end | |
def on_possible_authentication_failure(settings) | |
puts "Authentication possibly failed." | |
#EM.add_timer(retry_freq) { connect } | |
@amqp.reconnect(false,retry_freq) | |
# Must reregister the on_open method. It appears not to survive if we use reconnect | |
# on an AMQP::Session that did not connect successfully initially. | |
@amqp.on_open &method( :on_open ) | |
end | |
def on_tcp_connection_loss(connection, settings) | |
@ready = false | |
puts "Connection lost." | |
connection.reconnect(false,retry_freq) | |
end | |
def on_connection_interruption(connection) | |
@ready = false | |
puts "Connection interrupted" | |
#connection.reconnect(false,2) | |
end | |
def on_open | |
@ready = true | |
puts "Connected" | |
end | |
def on_recovery(connection, settings) | |
@ready = true | |
puts "Connection recovered." | |
end | |
def on_error(connection, connection_close) | |
puts "Handling a connection-level exception." | |
puts | |
puts "AMQP class id : #{connection_close.class_id}" | |
puts "AMQP method id: #{connection_close.method_id}" | |
puts "Status code : #{connection_close.reply_code}" | |
puts "Error message : #{connection_close.reply_text}" | |
end | |
def connect | |
@amqp = AMQP.connect( @connection_config ) | |
@amqp.on_open &method( :on_open ) | |
@amqp.on_tcp_connection_loss &method( :on_tcp_connection_loss ) | |
@amqp.on_connection_interruption &method( :on_connection_interruption ) | |
@amqp.on_recovery &method( :on_recovery ) | |
@amqp.on_error &method( :on_error ) | |
end | |
def stop | |
@ready = false | |
@amqp.disconnect { puts "AMQP Disconnected. Exiting..."; EM.stop } | |
end | |
end | |
amqp = nil | |
shutdown = Proc.new { | |
puts "Got signal" | |
amqp.stop if amqp | |
} | |
Signal.trap "TERM", shutdown | |
Signal.trap "INT", shutdown | |
EventMachine.run do | |
amqp = AMQPConnection.new amqp_connection_config | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment