Skip to content

Instantly share code, notes, and snippets.

@doryokujin
Created October 2, 2011 05:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doryokujin/1257119 to your computer and use it in GitHub Desktop.
Save doryokujin/1257119 to your computer and use it in GitHub Desktop.
MongoOutput included event time #2
module Fluent
class MongoOutput < BufferedOutput
Fluent::Plugin.register_output('mongo', self)
def initialize
super
require 'mongo'
require 'msgpack'
end
def configure(conf)
super
raise ConfigError, "'database' parameter is required on Mongo output" unless @database_name = conf['database']
raise ConfigError, "'collection' parameter is required on Mongo output" unless @collection_name = conf['collection']
@host, @port = host_and_port(conf)
# added new
@include_time = conf['include_time'] || nil
# capped configuration
if capped_conf = conf.elements.first
raise ConfigError, "'size' parameter is required on <store> of Mongo output" unless capped_conf.has_key?('size')
@capped_argument = {:capped => true}
@capped_argument[:size] = Integer(capped_conf['size'])
@capped_argument[:max] = Integer(capped_conf['max']) if capped_conf.has_key?('max')
@capped_database_name = capped_conf['database'] || 'fluent'
@capped_collection_name = capped_conf['collection'] || '__backup'
@capped_host, @capped_port = host_and_port(capped_conf)
end
@backuped = false
end
def start
super
@collection = Mongo::Connection.new(@host, @port).db(@database_name).collection(@collection_name)
@capped = capped_collection unless @capped_argument.nil?
end
def shutdown
# Mongo::Connection checks alive or closed myself
@collection.db.connection.close
@capped.db.connection.close unless @capped.nil?
super
end
def format(tag, event)
# added new
# event.record["ts"] = Time.at(event.time).strftime(@include_time) if @include_time
[ event.time, event.record ].to_msgpack
end
def write(chunk)
records = []
# last_update = Time.now
chunk.open { |io|
begin
MessagePack::Unpacker.new(io).each { |list|
event_time = list[0]
record = list[1]
record["ts"] = Time.at(event_time) if @include_time
# record["last_update"] = last_update
records << record
}
rescue EOFError
# EOFError always occured when reached end of chunk.
end
}
unless @backuped or @capped.nil?
@capped.insert(records)
@backuped = true
end
@collection.insert(records)
@backuped = false
end
private
def host_and_port(conf)
host = conf['host'] || 'localhost'
port = conf['port'] || 27017
[host, Integer(port)]
end
def capped_collection
db = Mongo::Connection.new(@capped_host, @capped_port).db(@capped_database_name)
if db.collection_names.include?(@capped_collection_name)
# TODO: Verify capped configuraton
db.collection(@capped_collection_name)
else
db.create_collection(@capped_collection_name, @capped_argument)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment