Skip to content

Instantly share code, notes, and snippets.

@somic
Created October 20, 2010 23:02
Show Gist options
  • Save somic/637526 to your computer and use it in GitHub Desktop.
Save somic/637526 to your computer and use it in GitHub Desktop.
#!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby
#
#
__doc__ = %q(
disttailf.rb - distributed "tail -f"
Aggregates "tail -f" output from multiple machines and multiple files
into a single RabbitMQ pubsub queue (kind of splunk's log consolidation
function)
Usage:
Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ...
Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c
)
require 'qpid'
require 'socket'
def consumer(client, ch)
myqueue = ch.queue_declare()
ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic',
:routing_key=>'disttailf.#')
cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true)
ruby_queue = client.queue(cons.consumer_tag)
while true
raise "Rabbitmq broker disconnected" if client.closed?
begin
msg = ruby_queue.pop(non_block=true)
puts "== #{msg.content.headers[:headers]} " \
"#{msg.routing_key.split('.')[-1]}"
puts msg.content.body
rescue
sleep(0.5)
end
end
end
def producer(client, ch, filenames)
rkey = "disttailf." + Socket.gethostname.split('.')[-1]
tail_f(filenames) do |filename, line|
h = {'sent' => Time.now.to_i, 'filename' => filename }
c = Qpid::Content.new({:headers=>h}, line)
ch.basic_publish(:routing_key=>rkey, :content=>c,
:exchange=>'amq.topic')
puts "#{filename}: #{line}"
end
end
def tail_f(filenames, &block)
filedict = Hash.new
filenames.each { |f| filedict[f] = open_or_nil(f) }
reopen_counter = 0
while true:
if reopen_counter > 120
reopen_counter = 0
filenames.reject { |f| filedict[f] }.each {
|f| filedict[f] = open_or_nil(f) }
end
filedict.values.reject { |f| not f }.each do |f|
begin
raise "trunc" unless File.stat(f.path).size >= f.tell
rescue
$stderr << "#{f.path}: removed or truncated\n"
f.close
filedict[f.path] = nil
next
end
begin
block.call(f.path,f.readline) while true
rescue EOFError
true
end
end
reopen_counter += 1
sleep(0.5)
end # while true
end
def open_or_nil(filename)
begin
File.open(filename)
rescue
nil
end
end
if __FILE__ == $0
require 'getoptlong'
server = '127.0.0.1'
port = 5672
specxml = '/etc/amqp0-8.xml'
acts_as_consumer = false
opts = GetoptLong.new(
['--server', '-s', GetoptLong::REQUIRED_ARGUMENT],
['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT],
['--consume', '-c', GetoptLong::NO_ARGUMENT])
opts.each do |opt,arg|
case opt
when '--server'
server = arg
when '--port'
port = arg.to_i
when '--specxml'
specxml = arg
when '--consume'
acts_as_consumer = true
end
end
# set up connection to rabbitmq broker
client = Qpid::Client.new(server, port, spec=Spec.load(specxml))
client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" })
ch = client.channel(1)
ch.channel_open()
if acts_as_consumer
consumer(client, ch)
else
if ARGV.length == 0
puts __doc__
raise "List of file names is empty - nothing to do"
end
producer(client, ch, ARGV)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment