#!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby # # __doc__ = %q( disttailf.rb - distributed "tail -f" Aggregates "tail -f" output from multiple machines and multiple files into a single RabbitMQ pubsub queue (kind of splunk's log consolidation function) Usage: Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ... Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c ) require 'qpid' require 'socket' def consumer(client, ch) myqueue = ch.queue_declare() ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic', :routing_key=>'disttailf.#') cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true) ruby_queue = client.queue(cons.consumer_tag) while true raise "Rabbitmq broker disconnected" if client.closed? begin msg = ruby_queue.pop(non_block=true) puts "== #{msg.content.headers[:headers]} " \ "#{msg.routing_key.split('.')[-1]}" puts msg.content.body rescue sleep(0.5) end end end def producer(client, ch, filenames) rkey = "disttailf." + Socket.gethostname.split('.')[-1] tail_f(filenames) do |filename, line| h = {'sent' => Time.now.to_i, 'filename' => filename } c = Qpid::Content.new({:headers=>h}, line) ch.basic_publish(:routing_key=>rkey, :content=>c, :exchange=>'amq.topic') puts "#{filename}: #{line}" end end def tail_f(filenames, &block) filedict = Hash.new filenames.each { |f| filedict[f] = open_or_nil(f) } reopen_counter = 0 while true: if reopen_counter > 120 reopen_counter = 0 filenames.reject { |f| filedict[f] }.each { |f| filedict[f] = open_or_nil(f) } end filedict.values.reject { |f| not f }.each do |f| begin raise "trunc" unless File.stat(f.path).size >= f.tell rescue $stderr << "#{f.path}: removed or truncated\n" f.close filedict[f.path] = nil next end begin block.call(f.path,f.readline) while true rescue EOFError true end end reopen_counter += 1 sleep(0.5) end # while true end def open_or_nil(filename) begin File.open(filename) rescue nil end end if __FILE__ == $0 require 'getoptlong' server = '127.0.0.1' port = 5672 specxml = '/etc/amqp0-8.xml' acts_as_consumer = false opts = GetoptLong.new( ['--server', '-s', GetoptLong::REQUIRED_ARGUMENT], ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT], ['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT], ['--consume', '-c', GetoptLong::NO_ARGUMENT]) opts.each do |opt,arg| case opt when '--server' server = arg when '--port' port = arg.to_i when '--specxml' specxml = arg when '--consume' acts_as_consumer = true end end # set up connection to rabbitmq broker client = Qpid::Client.new(server, port, spec=Spec.load(specxml)) client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" }) ch = client.channel(1) ch.channel_open() if acts_as_consumer consumer(client, ch) else if ARGV.length == 0 puts __doc__ raise "List of file names is empty - nothing to do" end producer(client, ch, ARGV) end end