Skip to content

Instantly share code, notes, and snippets.

@anolson
Created November 10, 2011 21:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save anolson/1356367 to your computer and use it in GitHub Desktop.
Save anolson/1356367 to your computer and use it in GitHub Desktop.
Log streaming in ruby.
class LogProducer
attr_accessor :streams
def initialize(options = {})
@streams = Hash.new
@filename = options[:filename]
add_stream(options[:stream])
end
def add_stream(stream)
@streams[stream.key] = stream
end
def start
@thread = Thread.new do
File::Tail::Logfile.tail(@filename, :backward => 10) do |line|
@streams.each do |key, stream|
stream.queue << line
end
end
end
end
def stop
@thread.kill
end
def should_stop?
delete_idle_streams
@streams.empty?
end
private
def delete_idle_streams
@streams.each do |key, stream|
if(stream.idle?)
@streams.delete(key)
end
end
end
end
require 'digest'
class LogStream
IDLE_THRESHOLD = 20
attr_accessor :queue, :key, :filename
def initialize()
@queue = Queue.new
@key = generate_hash
@last_read_at = Time.now.to_i
end
def log_messages()
@last_read_at = Time.now.to_i
messages = []
@queue.size.times do |i|
messages << @queue.pop
end
messages
end
def idle?()
idle_time > IDLE_THRESHOLD
end
def idle_time()
Time.now.to_i - @last_read_at
end
def generate_hash()
h = Digest::SHA2.new << Time.now.to_i.to_s
h.to_s
end
end
require 'rubygems'
require 'file/tail'
require 'thread'
require './log_stream'
require './log_producer'
class LogStreamer
def self.start_log_stream(filename)
log_stream = LogStream.new
start_producer(filename, log_stream)
start_sweeper
log_stream.key
end
def self.log_messages(filename, key)
producers[filename].streams[key].log_messages
end
def self.producers
@@producers ||= Hash.new
end
def self.start_producer(filename, stream)
if(producers.has_key?(filename))
producers[filename].add_stream(stream)
else
producers[filename] = LogProducer.new(:filename => filename, :stream => stream)
producers[filename].start
end
end
def self.start_sweeper
@@sweeper ||= Thread.new do
while(1) do
producers.each do |k, v|
if(v.should_stop?)
puts "Stopping producer thread."
v.stop
producers.delete(k)
end
end
sleep(5)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment