-
-
Save kiyoto/58483af1c69ed786f1f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class DockerMetricsInput < Input | |
Plugin.register_input('docker_metrics', self) | |
config_param :cgroup_path, :string, :default => '/sys/fs/cgroup' | |
config_param :stats_interval, :time, :default => 60 # every minute | |
config_param :tag_prefix, :string, :default => "docker" | |
def initialize | |
super | |
require 'socket' | |
@hostname = Socket.gethostname | |
end | |
def configure(conf) | |
super | |
end | |
def start | |
@loop = Coolio::Loop.new | |
tw = TimerWatcher.new(@stats_interval, true, @log, &method(:get_metrics)) | |
tw.attach(@loop) | |
@thread = Thread.new(&method(:run)) | |
end | |
def run | |
@loop.run | |
rescue | |
log.error "unexpected error", :error=>$!.to_s | |
log.error_backtrace | |
end | |
# Metrics collection methods | |
def get_metrics | |
list_container_ids.each do |id| | |
emit_container_metric(id, 'memory', 'memory.stat') | |
emit_container_metric(id, 'cpuacct', 'cpuacct.stat') | |
emit_container_metric(id, 'blkio', 'blkio.io_serviced') | |
emit_container_metric(id, 'blkio', 'blkio.io_service_bytes') | |
emit_container_metric(id, 'blkio', 'blkio.io_service_queued') | |
emit_container_metric(id, 'blkio', 'blkio.sectors') | |
end | |
end | |
def list_container_ids | |
`docker ps --no-trunc -q`.split /\s+/ | |
end | |
def emit_container_metric(id, metric_type, metric_filename, opts = {}) | |
path = "#{@cgroup_path}/#{metric_type}/docker/#{id}/#{metric_filename}" | |
if File.exists?(path) | |
parser = if metric_type != 'blkio' | |
KeyValueStatsParser.new(path, metric_filename.gsub('.', '_')) | |
else | |
BlkioStatsParser.new(path, metric_filename.gsub('.', '_')) | |
end | |
time = Engine.now | |
tag = "#{@tag_prefix}.#{metric_filename}" | |
mes = MultiEventStream.new | |
parser.parse_each_line do |data| | |
next if not data | |
# TODO: address this more elegantly | |
if data[:key] =~ /^(?:cpuacct|blkio|memory_stat_pg)/ | |
data[:type] = 'counter' | |
end | |
data["source"] = "#{@tag_prefix}:#{@hostname}:#{id}" | |
mes.add(time, data) | |
end | |
Engine.emit_stream(tag, mes) | |
else | |
nil | |
end | |
end | |
def shutdown | |
@loop.stop | |
@thread.join | |
end | |
class TimerWatcher < Coolio::TimerWatcher | |
def initialize(interval, repeat, log, &callback) | |
@callback = callback | |
@log = log | |
super(interval, repeat) | |
end | |
def on_timer | |
@callback.call | |
rescue | |
@log.error $!.to_s | |
@log.error_backtrace | |
end | |
end | |
class CGroupStatsParser | |
def initialize(path, metric_type) | |
raise ConfigError if not File.exists?(path) | |
@path = path | |
@metric_type = metric_type | |
end | |
def parse_line(line) | |
end | |
def parse_each_line(&block) | |
File.new(@path).each_line do |line| | |
block.call(parse_line(line)) | |
end | |
end | |
end | |
class KeyValueStatsParser < CGroupStatsParser | |
def parse_line(line) | |
k, v = line.split(/\s+/, 2) | |
if k and v | |
{ key: @metric_type + "_" + k, value: v.to_i } | |
else | |
nil | |
end | |
end | |
end | |
class BlkioStatsParser < CGroupStatsParser | |
BlkioLineRegexp = /^(?<major>\d+):(?<minor>\d+) (?<key>[^ ]+) (?<value>\d+)/ | |
def parse_line(line) | |
m = BlkioLineRegexp.match(line) | |
if m | |
{ key: @metric_type + "_" + m["key"].downcase, value: m["value"] } | |
else | |
nil | |
end | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class LibratoOutput < BufferedOutput | |
Plugin.register_output('librato', self) | |
config_param :email, :string | |
config_param :apikey, :string | |
config_param :value_key, :string, :default => "value" | |
config_param :measurement_key, :string, :default => "key" | |
config_param :source_key, :string, :default => "source" | |
config_param :type_key, :string, :default => "type" | |
def configure(conf) | |
super | |
require 'librato/metrics' | |
Librato::Metrics.authenticate @email, @apikey | |
@queue = Librato::Metrics::Queue.new | |
end | |
def start | |
# This is where you instantiate resources specific to the output, e.g. | |
# database connections, client library, etc. | |
super | |
end | |
def shutdown | |
super | |
@queue.submit | |
end | |
def write(chunk) | |
chunk.msgpack_each { |tag, time, record| | |
missing_keys = [@measurement_key, @value_key, @source_key].select { |k| !record[k] } | |
if missing_keys.length > 0 | |
log.warn "missing the required field(s) " + missing_keys.join(",") | |
next | |
end | |
@queue.add( | |
record[@measurement_key].to_s => | |
{ | |
:source => record[@source_key], | |
:value => record[@value_key], | |
:type => record[@type_key] || "gauge" | |
}) | |
} | |
@queue.submit | |
end | |
def format(tag, time, record) | |
[tag, time, record].to_msgpack | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<source> | |
type docker_metrics | |
stats_interval 1m | |
</source> | |
<match docker.**> | |
type copy | |
<store> | |
type stdout | |
</store> | |
<store> | |
type librato | |
email LIBRATO_EMAIL | |
apikey LIBRATO_APIKEY | |
</store> | |
</match> | |
<match docker.*> | |
type null | |
</match> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment