Skip to content

Instantly share code, notes, and snippets.

@doryokujin
Created November 12, 2011 10:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doryokujin/1360341 to your computer and use it in GitHub Desktop.
Save doryokujin/1360341 to your computer and use it in GitHub Desktop.
<source>
type tail
format /^(?<time>[^ ]* [^ ]*[ ]*[^ ]* [^ ]*) \[(?<type>[^\]]*)\] (?<verb>[^ ]*) (?<object>[^ ]*) (?<message>.*)$/
time_format %a %b %e %H:%M:%S
path /log/mongo/delta6/shard05/mongo.log,/log/mongo/delta6/shard11/mongo.log,/log/mongo/delta6/shard17/mongo.log,/log/mongo/delta6/shard23/mongo.log
tag mongo
</source>
<match mongo**>
type file
time_slice_wait 1m
buffer_path /home/doryokujin/buffer.log
database test
collection capped
host localhost
port 27017
<metrics>
name metrics_test
partition_time s
partition_key type
key $type,$verb,$object
# value 1
# float
</metrics>
</match>
module Fluent
class BasicMetricsOutput < TimeSlicedOutput
def initialize
super
require 'msgpack'
require 'time'
@metrics = []
end
def configure(conf)
super
conf.elements.select {|e|
e.name == 'metrics'
}.each {|e|
add_metrics(e)
}
end
def add_metrics(conf)
m = Metrics.new
m.configure(conf)
@metrics << m
end
class Metrics
def initialize
@float = false
end
def configure(conf)
raise ConfigError, "'name' parameter is required on <metrics> directive" unless @name = conf['name']
raise ConfigError,"'partition_key' parameter is required on <metrics> directive" unless @partition_key = conf['partition_key']
raise ConfigError, "'key' parameter is required on <metrics> directive" unless key = conf['key']
value = conf['value'] || 1.to_s
constant_key_name = conf['constant_key_name'] || "_constant_"
is_float = true if conf['float']
@key_proc = BasicMetricsOutput.create_get_value_proc_combined(key, is_float, constant_key_name)
@value_proc = BasicMetricsOutput.create_get_value_proc_combined(value, is_float, constant_key_name)
partition_time = conf['partition_time'] || 'h'
case partition_time
when 'd'
time_format = "%Y-%m-%d"
when 'h'
time_format = "%Y-%m-%d %H"
when 'm'
time_format = "%Y-%m-%d %H:%M"
when 's'
time_format = "%Y-%m-%d %H:%M:%S"
else
raise ConfigError, "Unexpected partition_time parameter #{partition_time.dump}: expected d, h, m or s"
end
@partition_proc = Proc.new {|time,record|
Time.at(time).strftime(time_format)
}
end
def format(tag, time, record)
partition_time = @partition_proc.call(time, record)
key = @key_proc.call(record)
value = @value_proc.call(record)
if key && value
a = {
"time" => time, "name" => @name,
"partition" => { "key" => @partition_key, "time" => partition_time },
"key" => key,
"value" => value
}
# puts a
a.to_msgpack
else
nil
end
end
end
def format(tag, time, record)
out = ''
@metrics.each {|m|
if data = m.format(tag, time, record)
out << data
end
}
out
end
private
def self.create_get_value_proc_combined(expr, is_float, constant_key_name)
list = expr.split(',').map {|e| e.strip }
procs = list.map {|e|
create_get_value_proc(e, is_float, constant_key_name)
}
Proc.new {|record|
map = Hash[ *procs.map {|pr| pr.call(record)}.flatten ]
if map.values().any? {|e| e == nil }
nil
else
map
end
}
end
def self.create_get_value_proc(expr, is_float, constant_key_name)
if expr =~ /\A([^\w])(.*)\Z/
sym = $~[1]
key = $~[2]
case sym
when '$'
Proc.new {|record|
val = record[key]
if val =~ /^[+-]?\d+$/
val = is_float ? val.to_f : val.to_i
elsif val =~ /^[+-]?\d+\.?\d*$/
val = val.to_f
end
[key, val]
}
else
raise ConfigError, "Unknown metrics expression #{expr.dump}"
end
else
# constant
val = is_float ? expr.to_f : expr.to_i
Proc.new {|record| [ constant_key_name, val ] }
end
end
end
end
module Fluent
class MongoMetricsOutput < BasicMetricsOutput
Plugin.register_output('mongo_metrics', self)
def initialize
super
require 'mongo'
end
def configure(conf)
super
# Mongo
raise ConfigError, "'database' parameter is required on MongoDump output" unless @database = conf['database']
@collection = conf['collection']
@use_common_coll = @collection.nil? ? false : true
@host = conf['host'] || 'localhost'
@port = conf['port'] || 27017
if conf.has_key?('capped')
@collection_opts = { :capped => true }
@collectoin_opts[:size] = conf.has_key?('cap_size') ? Config.size_value(cap_size) : Config.size_value('1000m')
@collection_opts[:max] = Config.size_value(cap_size) if conf.has_key?('cap_max')
end
end
def start
@coll = @use_common_coll ? get_collection(@collection) : nil
super
end
def shutdown
super
@coll.db.connection.close
end
def write(chunk)
# record = {
# :time => time,
# :name => name,
# :partition => { :key => partition_key, :time => partition_time },
# :key => key,
# :value => value
# }
chunk.open { |io|
begin
# :key => key,
# :value => value
# }
chunk.open { |io|
begin
MessagePack::Unpacker.new(io).each { |record|
p record.to_s
if @coll.nil?
@coll = get_collection(record["name"])
elsif ! @use_common_coll and @coll.name != record["name"]
@coll = get_collection(record["name"])
end
@coll.insert(record)
# Don't forget adding the option :upsert.
=begin
@coll.update({
"partition_key" => record["partition"]["key"],
"partition_time"=> record["partition"]["time"]
}, {
"$inc" => { key => record["field"]["value"] }
}, {
:upsert => true,
:multi => true
})
=end
}
rescue EOFError
# EOFError always occured when reached end of chunk.
end
}
end
private
def get_collection(collection)
db = Mongo::Connection.new(@host, @port).db(@database)
if db.collection_names.include?(collection)
return db.collection(collection)
else
return db.create_collection(collection, @collection_opts)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment