|
#!/usr/bin/env ruby |
|
#encoding: utf-8 |
|
|
|
begin |
|
require File.expand_path("../bundle/bundler/setup", __FILE__) |
|
rescue LoadError |
|
require "bundler" |
|
Bundler.setup |
|
end |
|
|
|
require "optparse" |
|
require "stomp" |
|
require "json/pure" |
|
require "benchmark" |
|
require "socket" |
|
require "timeout" |
|
|
|
# http://activemq.apache.org/statisticsplugin.html |
|
# http://activemq.2283324.n4.nabble.com/statisticsBrokerPlugin-with-Stomp-td2353331.html |
|
|
|
NUMERIC_TYPES = ['double', 'long', 'int'] |
|
ALLOWED_TYPES = NUMERIC_TYPES + ['string'] |
|
|
|
options = { |
|
:port => 61613, |
|
:host => nil, |
|
:format => "json", |
|
:types => [], |
|
:receive_timeout => 5 |
|
} |
|
|
|
OptionParser.new do |opts| |
|
opts.banner = "Usage: #{$0} [options]" |
|
opts.on("-H", "--host HOST", "Hostname") do |host| |
|
options[:host] = host |
|
end |
|
opts.on("-f", "--format FORMAT", "Select format") do |format| |
|
options[:format] = format |
|
end |
|
opts.on("-p", "--port PORT", "Port number") do |port| |
|
options[:port] = port |
|
end |
|
opts.on("-k", "--key KEY", "Selected key") do |key| |
|
options[:key] = key |
|
end |
|
opts.on("-T", "--time", "Get real enqueue time") do |time| |
|
options[:time] = time |
|
end |
|
opts.on("-t", "--type TYPES", "Selected types") do |types| |
|
options[:types] = types.split(',') |
|
|
|
if options[:types].include?('numeric') |
|
options[:types].delete('numeric') |
|
options[:types] |= NUMERIC_TYPES |
|
end |
|
|
|
invalid_types = options[:types] - ALLOWED_TYPES |
|
|
|
unless invalid_types.empty? |
|
abort "Invalid type(s): #{invalid_types.join(', ')}\n" + |
|
"Allowed: #{ALLOWED_TYPES.join(', ')}\n" + |
|
"Virtual type `numeric` includes: #{NUMERIC_TYPES.join(', ')}" |
|
end |
|
end |
|
opts.on("-q", "--queue QUEUE", "Queue name") do |queue| |
|
options[:queue] = queue |
|
end |
|
|
|
# zabbix format specific options |
|
opts.on("--zabbix-prefix PREFIX", "Select prefix") do |prefix| |
|
options[:zabbix_prefix] = prefix |
|
end |
|
opts.on("--zabbix-host HOST", "Select host") do |host| |
|
options[:zabbix_host] = host |
|
end |
|
|
|
opts.on("--receive-timeout SECONDS", "Receive timeout (0 - wait forever, default: #{options[:receive_timeout]})") do |seconds| |
|
options[:receive_timeout] = Integer(seconds) |
|
end |
|
|
|
opts.on("-h", "--help", "Show this message") do |
|
puts opts |
|
exit |
|
end |
|
end.parse! |
|
|
|
def jms_map_simplify(p, types = []) |
|
a = p['map']['entry'].map do |entry| |
|
keys = entry.keys |
|
|
|
if keys.size == 1 && keys.include?('string') |
|
if types.empty? || types.include?('string') |
|
[entry['string'][0], entry['string'][1]] |
|
end |
|
elsif keys.size == 2 && keys.include?('string') |
|
type = (keys - ['string']).first |
|
if types.empty? || types.include?(type) |
|
[entry['string'], entry[type]] |
|
end |
|
end |
|
end.compact |
|
|
|
Hash[a] |
|
end |
|
|
|
def silence_stream(stream) |
|
old_stream = stream.dup |
|
stream.reopen(RUBY_PLATFORM =~ /(:?mswin|mingw)/ ? 'NUL:' : '/dev/null') |
|
stream.sync = true |
|
yield |
|
ensure |
|
stream.reopen(old_stream) |
|
end |
|
|
|
connection_hash = { |
|
:hosts => [ |
|
{:login => "guest", :passcode => "guest", :host => options[:host], :port => options[:port], :ssl => false} |
|
], |
|
:initial_reconnect_delay => 2, |
|
:max_reconnect_attempts => 2 |
|
} |
|
|
|
conn = nil |
|
begin |
|
silence_stream(STDERR) do |
|
conn = Stomp::Connection.open(connection_hash) |
|
end |
|
rescue |
|
puts '-1' |
|
exit |
|
end |
|
|
|
body = nil |
|
total_time = Benchmark.realtime do |
|
if options[:queue] |
|
conn.subscribe("/queue/STATS.#{options[:queue]}", {"transformation" => "jms-map-json"}) |
|
conn.publish("/queue/ActiveMQ.Statistics.Destination.#{options[:queue]}", "", {"reply-to" => "/queue/STATS.#{options[:queue]}"}) |
|
else |
|
conn.subscribe("/queue/STATS", {"transformation" => "jms-map-json"}) |
|
conn.publish("/queue/ActiveMQ.Statistics.Broker", "", {"reply-to" => "/queue/STATS"}) |
|
end |
|
|
|
begin |
|
Timeout.timeout(options[:receive_timeout]) do |
|
body = conn.receive.body |
|
end |
|
rescue Timeout::Error |
|
puts '-1' |
|
exit |
|
end |
|
|
|
conn.disconnect |
|
end |
|
|
|
if options[:time] |
|
puts total_time |
|
exit |
|
end |
|
|
|
if options[:format] == "raw" |
|
puts body |
|
exit |
|
end |
|
|
|
if options[:format] == "zabbix" |
|
hash = jms_map_simplify(JSON.parse(body), options[:types]) |
|
|
|
options[:zabbix_host] ||= Socket.gethostname |
|
options[:zabbix_prefix] ||= "" |
|
|
|
if options[:key].nil? |
|
hash.each do |key, value| |
|
puts "#{options[:zabbix_host]} #{options[:zabbix_prefix]}#{key} #{value}" |
|
end |
|
else |
|
hash.select {|n, v| n == options[:key] }.each do |key, value| |
|
puts "#{options[:zabbix_host]} #{options[:zabbix_prefix]}#{key} #{value}" |
|
end |
|
end |
|
exit |
|
end |
|
|
|
if options[:format] == "json" |
|
hash = jms_map_simplify(JSON.parse(body), options[:types]) |
|
puts options[:key].nil? ? hash.to_json : hash[options[:key]] |
|
exit |
|
end |
|
|