Skip to content

Instantly share code, notes, and snippets.

@mrjabba
Created July 20, 2012 17:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrjabba/3151929 to your computer and use it in GitHub Desktop.
Save mrjabba/3151929 to your computer and use it in GitHub Desktop.
rabbit consumer testing
class BaseMessageHandler
def initialize(opts = {})
@connection_string = opts[:rabbit_mq_host]
raise ConfigurationError.new('rabbit_mq_host is not specified') unless @connection_string
end
def fetch_channel
AMQP.start(@connection_string) do |connection, open_ok|
AMQP::Channel.new(connection) do |channel|
yield channel
channel.on_error do |ch, close|
puts "Handling a channel-level exception on channel: #{close.reply_text}, #{close.inspect}"
end
connection.close {
EventMachine.stop { exit }
}
end
end
rescue AMQP::PossibleAuthenticationFailureError => afe
afe.settings.delete(:user)
afe.settings.delete(:pass)
puts "Authentication failure! AMQP broker closed TCP connection before authentication succeeded #{afe.settings}" #airbrake todo
rescue AMQP::TCPConnectionFailed => e
puts "Caught AMQP::TCPConnectionFailed => TCP connection failed, as expected. #{e}"
end
end
class Consumer < BaseMessageHandler
attr_reader :queue_name
def initialize(opts = {})
@queue_name = opts[:queue_name]
raise ConfigurationError.new('queue_name is not specified') unless @queue_name
super
end
def consume
message = nil
fetch_channel do |channel|
pop_message(channel) do |payload|
message = payload
end
end
message
end
private
def pop_message(channel)
channel.queue(@queue_name, DURABILITY_OPTS).pop do |metadata, payload|
yield payload if payload
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment