Skip to content

Instantly share code, notes, and snippets.

@mzp
Created September 3, 2012 22:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mzp/3614275 to your computer and use it in GitHub Desktop.
Save mzp/3614275 to your computer and use it in GitHub Desktop.
typeperf plugin for Fluentd(libuv ver)
module Fluent
class TypeperfInput < Input
Plugin.register_input('typeperf', self)
def initialize
super
require 'csv'
@hostname = `hostname`.chomp!
@line_number = 0
@keys = []
@max_lines = 1000
end
config_param :tag, :string
config_param :delay, :integer, :default => 1
config_param :tmp_file, :string, :default => "C:/tmp/dstat.csv"
def configure(conf)
super
@command = %(typeperf -y -o #{@tmp_file} -si #{@delay} "\\Memory\\Available Bytes" "\\Processor(_Total)\\% Processor Time" "\\Processor(_Total)\\% User Time" "\\Processor(_Total)\\% Privileged Time")
end
def start
reset_typeperf
@loop = Fluent::EventIO::Loop.create
@dw = DstatCSVWatcher.new(@loop, @tmp_file, &method(:receive_lines))
@thread = Thread.new(&method(:run))
end
def shutdown
@dw.close
@loop.stop
@thread.join
@io.close
File.delete(@tmp_file)
end
def run
begin
@loop.run
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
end
def reset_typeperf
# stop current process
Process.kill(:KILL, @pid) if @pid
@dw.close if @dw
@io.close if @io
# remove old file
require 'fileutils'
FileUtils.rm_f @tmp_file
# run new typeperf
FileUtils.touch @tmp_file
@io = IO.popen(@command, "r")
@pid = @io.pid
$log.info "run #{@command}(pid = #{@pid})"
end
def receive_lines(lines)
lines.each do |line|
next if line == ""
line.delete!("\"")
case @line_number
when 0
@keys = CSV.parse_line(line)
else
values = line.split(',')
data = Hash[*@keys.zip(values).flatten]
record = {
'hostname' => @hostname,
'data' => data
}
Engine.emit(@tag, Engine.now, record)
end
if (@line_number % @max_lines) == (@max_lines - 1)
$log.info "switch file"
begin
reset_typeperf
@dw.reset
rescue
$log.error "error on output thread", :error=>$!.to_s
$log.error_backtrace
end
end
@line_number += 1
end
end
class DstatCSVWatcher
attr_accessor :previous, :cur
def initialize(loop, path, &receive_lines)
@path = path
@io = File.open(path)
@pos = 0
@receive_lines = receive_lines
loop.file_stat(@path,&method(:on_change))
end
def reset
@io = File.open(@path)
@pos = 0
end
def on_change(name, e)
cur = File.stat(name).size
buffer = @io.read(cur - @pos)
@pos = cur
lines = []
while line = buffer.slice!(/.*?\n/m)
lines << line.chomp
end
@receive_lines.call(lines)
end
def close
@io.close
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment