Skip to content

Instantly share code, notes, and snippets.

@felippemr
Forked from chrisckchang/in_serverstatus.rb
Last active April 19, 2016 20:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save felippemr/6d39fcf35f51fb7a2e38 to your computer and use it in GitHub Desktop.
Save felippemr/6d39fcf35f51fb7a2e38 to your computer and use it in GitHub Desktop.
MongoDB serverStatus output plugin which works with mongoDB 3.0
module Fluent
class ServerStatusInput < Input
Plugin.register_input('serverstatus', self)
config_param :uris, :array, :default => nil
config_param :uri, :string
config_param :stats_interval, :time, :default => 60 # every minute
config_param :tag_prefix, :string, :default => "serverstatus"
def initialize
super
require 'mongo'
end
def configure(conf)
super
unless @uris or @uri
raise ConfigError, 'uris or uri must be specified'
end
if @uris.nil?
@uris = [@uri]
end
@conns = get_conection
end
def get_conection
@uris.map do |uri_str|
uri_str = "mongodb://#{uri_str}" if not uri_str.start_with?("mongodb://")
uri = get_uri(uri_str)
client = Mongo::Client.new(uri.servers, uri.options)
[client, uri]
end
end
def get_uri(uri_str)
if uri_str.include?('@')
user, password = uri_str.split('@')[0].split('//')[1].split(':')
uri = Mongo::URI.new(uri_str, :connect => :direct, :user => user, :password => password)
else
uri = Mongo::URI.new(uri_str, :connect => :direct)
end
uri
end
def start
@loop = Coolio::Loop.new
tw = TimerWatcher.new(@stats_interval, true, @log, &method(:collect_serverstatus))
tw.attach(@loop)
@thread = Thread.new(&method(:run))
end
def run
@loop.run
rescue
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
end
def shutdown
@loop.stop
@thread.join
end
def reconnect
for conn, conn_uri in @conns
conn.reconnect
end
end
def collect_serverstatus
begin
for conn, conn_uri in @conns
database = conn.database
stats = database.command(:serverStatus => :true).first
make_data_msgpack_compatible(stats)
host, port = conn_uri.servers[0].split(':')
tag = [@tag_prefix, host.gsub(/[\.-]/, "_"), port].join(".")
Engine.emit(tag, Engine.now, stats)
end
rescue => e
log.error "failed to collect MongoDB stats", :error_class => e.class, :error => e
log.info "trying to reconnect on mongodb..."
reconnect
end
end
# MessagePack doesn't like it when the field is of Time class.
# This is a convenient method that traverses through the
# getServerStatus response and update any field that is of Time class.
def make_data_msgpack_compatible(data)
if [Hash, BSON::Document].include?(data.class)
data.each {|k, v|
if v.respond_to?(:each)
make_data_msgpack_compatible(v)
elsif v.class == Time
data[k] = v.to_i
elsif v.class == BSON::ObjectId
data[k] = v.to_s
end
}
# serverStatus's "locks" field has "." as a key, which can't be
# inserted back to MongoDB withou wreaking havoc. Replace it with
# "global"
data["global"] = data.delete(".") if data["."]
elsif data.class == Array
data.each_with_index { |v, i|
if v.respond_to?(:each)
make_data_msgpack_compatible(v)
elsif v.class == Time
data[i] = v.to_i
end
}
end
end
class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, repeat, log, &callback)
@callback = callback
@log = log
super(interval, repeat)
end
def on_timer
@callback.call
rescue
@log.error $!.to_s
@log.error_backtrace
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment