Created
September 3, 2012 22:42
-
-
Save mzp/3614275 to your computer and use it in GitHub Desktop.
typeperf plugin for Fluentd(libuv ver)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class TypeperfInput < Input | |
Plugin.register_input('typeperf', self) | |
def initialize | |
super | |
require 'csv' | |
@hostname = `hostname`.chomp! | |
@line_number = 0 | |
@keys = [] | |
@max_lines = 1000 | |
end | |
config_param :tag, :string | |
config_param :delay, :integer, :default => 1 | |
config_param :tmp_file, :string, :default => "C:/tmp/dstat.csv" | |
def configure(conf) | |
super | |
@command = %(typeperf -y -o #{@tmp_file} -si #{@delay} "\\Memory\\Available Bytes" "\\Processor(_Total)\\% Processor Time" "\\Processor(_Total)\\% User Time" "\\Processor(_Total)\\% Privileged Time") | |
end | |
def start | |
reset_typeperf | |
@loop = Fluent::EventIO::Loop.create | |
@dw = DstatCSVWatcher.new(@loop, @tmp_file, &method(:receive_lines)) | |
@thread = Thread.new(&method(:run)) | |
end | |
def shutdown | |
@dw.close | |
@loop.stop | |
@thread.join | |
@io.close | |
File.delete(@tmp_file) | |
end | |
def run | |
begin | |
@loop.run | |
rescue | |
$log.error "unexpected error", :error=>$!.to_s | |
$log.error_backtrace | |
end | |
end | |
def reset_typeperf | |
# stop current process | |
Process.kill(:KILL, @pid) if @pid | |
@dw.close if @dw | |
@io.close if @io | |
# remove old file | |
require 'fileutils' | |
FileUtils.rm_f @tmp_file | |
# run new typeperf | |
FileUtils.touch @tmp_file | |
@io = IO.popen(@command, "r") | |
@pid = @io.pid | |
$log.info "run #{@command}(pid = #{@pid})" | |
end | |
def receive_lines(lines) | |
lines.each do |line| | |
next if line == "" | |
line.delete!("\"") | |
case @line_number | |
when 0 | |
@keys = CSV.parse_line(line) | |
else | |
values = line.split(',') | |
data = Hash[*@keys.zip(values).flatten] | |
record = { | |
'hostname' => @hostname, | |
'data' => data | |
} | |
Engine.emit(@tag, Engine.now, record) | |
end | |
if (@line_number % @max_lines) == (@max_lines - 1) | |
$log.info "switch file" | |
begin | |
reset_typeperf | |
@dw.reset | |
rescue | |
$log.error "error on output thread", :error=>$!.to_s | |
$log.error_backtrace | |
end | |
end | |
@line_number += 1 | |
end | |
end | |
class DstatCSVWatcher | |
attr_accessor :previous, :cur | |
def initialize(loop, path, &receive_lines) | |
@path = path | |
@io = File.open(path) | |
@pos = 0 | |
@receive_lines = receive_lines | |
loop.file_stat(@path,&method(:on_change)) | |
end | |
def reset | |
@io = File.open(@path) | |
@pos = 0 | |
end | |
def on_change(name, e) | |
cur = File.stat(name).size | |
buffer = @io.read(cur - @pos) | |
@pos = cur | |
lines = [] | |
while line = buffer.slice!(/.*?\n/m) | |
lines << line.chomp | |
end | |
@receive_lines.call(lines) | |
end | |
def close | |
@io.close | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment