Skip to content

Instantly share code, notes, and snippets.

@chrisckchang
Created June 25, 2014 15:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save chrisckchang/943a69b02f3435281557 to your computer and use it in GitHub Desktop.
Save chrisckchang/943a69b02f3435281557 to your computer and use it in GitHub Desktop.
MongoDB serverStatus output plugin
module Fluent
class ServerStatusInput < Input
Plugin.register_input('serverstatus', self)
config_param :uris, :array, :default => nil
config_param :uri, :string, :default => "mongodb://localhost:27017"
config_param :stats_interval, :time, :default => 60 # every minute
config_param :tag_prefix, :string, :default => "serverstatus"
def initialize
super
require 'mongo'
end
def configure(conf)
super
unless @uris or @uri
raise ConfigError, 'uris or uri must be specified'
end
if @uris.nil?
@uris = [@uri]
end
@conns = @uris.map do |uri_str|
uri_str = "mongodb://#{uri_str}" if not uri_str.start_with?("mongodb://")
uri = Mongo::URIParser.new(uri_str)
[Mongo::MongoClient.from_uri(uri_str), uri]
end
end
def start
@loop = Coolio::Loop.new
tw = TimerWatcher.new(@stats_interval, true, @log, &method(:collect_serverstatus))
tw.attach(@loop)
@thread = Thread.new(&method(:run))
end
def run
@loop.run
rescue
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
end
def shutdown
@loop.stop
@thread.join
end
def collect_serverstatus
begin
for conn, conn_uri in @conns
stats = conn.db('admin').command(:serverStatus => true)
make_data_msgpack_compatible(stats)
tag = [@tag_prefix, conn_uri.host.gsub(/[\.-]/, "_"), conn_uri.port].join(".")
Engine.emit(tag, Engine.now, stats)
end
rescue => e
log.error "failed to collect MongoDB stats", :error_class => e.class, :error => e
end
end
# MessagePack doesn't like it when the field is of Time class.
# This is a convenient method that traverses through the
# getServerStatus response and update any field that is of Time class.
def make_data_msgpack_compatible(data)
if [Hash, BSON::OrderedHash].include?(data.class)
data.each {|k, v|
if v.respond_to?(:each)
make_data_msgpack_compatible(v)
elsif v.class == Time
data[k] = v.to_i
end
}
# serverStatus's "locks" field has "." as a key, which can't be
# inserted back to MongoDB withou wreaking havoc. Replace it with
# "global"
data["global"] = data.delete(".") if data["."]
elsif data.class == Array
data.each_with_index { |v, i|
if v.respond_to?(:each)
make_data_msgpack_compatible(v)
elsif v.class == Time
data[i] = v.to_i
end
}
end
end
class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, repeat, log, &callback)
@callback = callback
@log = log
super(interval, repeat)
end
def on_timer
@callback.call
rescue
@log.error $!.to_s
@log.error_backtrace
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment