Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@makuk66
Created August 3, 2011 13:31
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save makuk66/1122629 to your computer and use it in GitHub Desktop.
Save makuk66/1122629 to your computer and use it in GitHub Desktop.
simple consumer for rabbitmq firehose; see http://www.rabbitmq.com/firehose.html
#!/home/mqtester/.rvm/wrappers/ruby-1.9.2-p290@mqtester/ruby
# encoding: utf-8
require "rubygems"
require "amqp"
@exchange_name = "amq.rabbitmq.trace"
class Consumer
def handle_message(metadata, payload)
puts "routing_key: #{metadata.routing_key}, payload: #{payload}"
end
end
class Worker
def initialize(channel, exchange_name, queue_name, consumer = Consumer.new)
@exchange_name = exchange_name
@queue_name = queue_name
@channel = channel
@channel.on_error(&method(:handle_channel_exception))
@consumer = consumer
end
def start
@exchange = @channel.topic(@exchange_name, :durable => true, :auto_delete => false, :internal => true)
@queue = @channel.queue(@queue_name, :durable => false, :auto_delete => true)
@queue.bind(@exchange, :routing_key => '#')
@queue.subscribe(&@consumer.method(:handle_message))
end
def handle_channel_exception(channel, channel_close)
puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end
end
AMQP.start("amqp://user:password@localhost/vhost") do |connection, open_ok|
channel = AMQP::Channel.new(connection)
worker = Worker.new(channel, @exchange_name, '')
worker.start
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment