Skip to content

Instantly share code, notes, and snippets.

@kiyoto
Last active April 25, 2017 20:17
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kiyoto/58483af1c69ed786f1f8 to your computer and use it in GitHub Desktop.
Save kiyoto/58483af1c69ed786f1f8 to your computer and use it in GitHub Desktop.
module Fluent
class DockerMetricsInput < Input
Plugin.register_input('docker_metrics', self)
config_param :cgroup_path, :string, :default => '/sys/fs/cgroup'
config_param :stats_interval, :time, :default => 60 # every minute
config_param :tag_prefix, :string, :default => "docker"
def initialize
super
require 'socket'
@hostname = Socket.gethostname
end
def configure(conf)
super
end
def start
@loop = Coolio::Loop.new
tw = TimerWatcher.new(@stats_interval, true, @log, &method(:get_metrics))
tw.attach(@loop)
@thread = Thread.new(&method(:run))
end
def run
@loop.run
rescue
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
end
# Metrics collection methods
def get_metrics
list_container_ids.each do |id|
emit_container_metric(id, 'memory', 'memory.stat')
emit_container_metric(id, 'cpuacct', 'cpuacct.stat')
emit_container_metric(id, 'blkio', 'blkio.io_serviced')
emit_container_metric(id, 'blkio', 'blkio.io_service_bytes')
emit_container_metric(id, 'blkio', 'blkio.io_service_queued')
emit_container_metric(id, 'blkio', 'blkio.sectors')
end
end
def list_container_ids
`docker ps --no-trunc -q`.split /\s+/
end
def emit_container_metric(id, metric_type, metric_filename, opts = {})
path = "#{@cgroup_path}/#{metric_type}/docker/#{id}/#{metric_filename}"
if File.exists?(path)
parser = if metric_type != 'blkio'
KeyValueStatsParser.new(path, metric_filename.gsub('.', '_'))
else
BlkioStatsParser.new(path, metric_filename.gsub('.', '_'))
end
time = Engine.now
tag = "#{@tag_prefix}.#{metric_filename}"
mes = MultiEventStream.new
parser.parse_each_line do |data|
next if not data
# TODO: address this more elegantly
if data[:key] =~ /^(?:cpuacct|blkio|memory_stat_pg)/
data[:type] = 'counter'
end
data["source"] = "#{@tag_prefix}:#{@hostname}:#{id}"
mes.add(time, data)
end
Engine.emit_stream(tag, mes)
else
nil
end
end
def shutdown
@loop.stop
@thread.join
end
class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, repeat, log, &callback)
@callback = callback
@log = log
super(interval, repeat)
end
def on_timer
@callback.call
rescue
@log.error $!.to_s
@log.error_backtrace
end
end
class CGroupStatsParser
def initialize(path, metric_type)
raise ConfigError if not File.exists?(path)
@path = path
@metric_type = metric_type
end
def parse_line(line)
end
def parse_each_line(&block)
File.new(@path).each_line do |line|
block.call(parse_line(line))
end
end
end
class KeyValueStatsParser < CGroupStatsParser
def parse_line(line)
k, v = line.split(/\s+/, 2)
if k and v
{ key: @metric_type + "_" + k, value: v.to_i }
else
nil
end
end
end
class BlkioStatsParser < CGroupStatsParser
BlkioLineRegexp = /^(?<major>\d+):(?<minor>\d+) (?<key>[^ ]+) (?<value>\d+)/
def parse_line(line)
m = BlkioLineRegexp.match(line)
if m
{ key: @metric_type + "_" + m["key"].downcase, value: m["value"] }
else
nil
end
end
end
end
end
module Fluent
class LibratoOutput < BufferedOutput
Plugin.register_output('librato', self)
config_param :email, :string
config_param :apikey, :string
config_param :value_key, :string, :default => "value"
config_param :measurement_key, :string, :default => "key"
config_param :source_key, :string, :default => "source"
config_param :type_key, :string, :default => "type"
def configure(conf)
super
require 'librato/metrics'
Librato::Metrics.authenticate @email, @apikey
@queue = Librato::Metrics::Queue.new
end
def start
# This is where you instantiate resources specific to the output, e.g.
# database connections, client library, etc.
super
end
def shutdown
super
@queue.submit
end
def write(chunk)
chunk.msgpack_each { |tag, time, record|
missing_keys = [@measurement_key, @value_key, @source_key].select { |k| !record[k] }
if missing_keys.length > 0
log.warn "missing the required field(s) " + missing_keys.join(",")
next
end
@queue.add(
record[@measurement_key].to_s =>
{
:source => record[@source_key],
:value => record[@value_key],
:type => record[@type_key] || "gauge"
})
}
@queue.submit
end
def format(tag, time, record)
[tag, time, record].to_msgpack
end
end
end
<source>
type docker_metrics
stats_interval 1m
</source>
<match docker.**>
type copy
<store>
type stdout
</store>
<store>
type librato
email LIBRATO_EMAIL
apikey LIBRATO_APIKEY
</store>
</match>
<match docker.*>
type null
</match>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment