Skip to content

Instantly share code, notes, and snippets.

@ksauzz
Created September 28, 2011 17:57
Show Gist options
  • Save ksauzz/1248678 to your computer and use it in GitHub Desktop.
Save ksauzz/1248678 to your computer and use it in GitHub Desktop.
fluent amqp plugin
module Fluent
class AmqpInput < Input
Fluent::Plugin.register_input('amqp', self)
def initialize
super
require 'amqp'
end
def configure(conf)
super
raise ConfigError, "'tag' parameter is required" unless @tag = conf['tag']
raise ConfigError, "'queue' parameter is required" unless @queue = conf['queue']
@host = conf.has_key?('host') ? conf['host'] : 'localhost'
@port = conf.has_key?('port') ? conf['port'] : 5672
@auto_delete = conf.has_key?('auto_delete') ? conf['auto_delete'] : false
end
def start
super
EventMachine.run do
@connection = AMQP.connect(:host => @host)
channel = AMQP::Channel.new(@connection)
queue = channel.queue(@queue, :auto_delete => @auto_delete)
exchange = channel.direct("")
queue.subscribe do |record|
time = Time.now.to_i
Engine.emit(@tag, Fluent::Event.new(time, record))
end
end
end
def shutdown
super
@connection.close { EventMachive.stop }
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment