Skip to content

Instantly share code, notes, and snippets.

@kml
Last active August 29, 2015 14:02
Show Gist options
  • Save kml/334a6cb73d259f8dedec to your computer and use it in GitHub Desktop.
Save kml/334a6cb73d259f8dedec to your computer and use it in GitHub Desktop.
activemq-stats-ruby
.bundle
bundle

Read ActiveMQ stats using Ruby

Building standalone package

rm -rf ./bundle
bundle install --standalone

Global stats

./statistics.rb
{
  "vm": "vm://host1",
  "memoryLimit": 20971520,
  "memoryUsage": 284730,
  "memoryPercentUsage": 1,
  ...
}

./statistics.rb -H host1 -k memoryUsage
284730

./statistics.rb -H host1 -f raw
{"map":{"entry":[{"string":["vm","vm:\/\/host1"]},{"string":"memoryUsage","long":284730}...

./statistics.rb -H host1 -f zabbix --zabbix-host myhost --zabbix-prefix activemq. -t numeric
myhost activemq.memoryLimit 20971520
myhost activemq.memoryUsage 284730
myhost activemq.memoryPercentUsage 1
...

Selected queue stats

./statistics.rb -H host1 -q QUEUE_NAME
{
  "size": 0,
  "consumerCount": 2,
  "enqueueCount": 620917,
  "dequeueCount": 620917,
  ...
}
source "http://rubygems.org"
gem "stomp", "1.1.9"
gem "json_pure", "1.6.1"
GEM
remote: http://rubygems.org/
specs:
json_pure (1.6.1)
stomp (1.1.9)
PLATFORMS
ruby
DEPENDENCIES
json_pure (= 1.6.1)
stomp (= 1.1.9)
#!/usr/bin/env ruby
#encoding: utf-8
begin
require File.expand_path("../bundle/bundler/setup", __FILE__)
rescue LoadError
require "bundler"
Bundler.setup
end
require "optparse"
require "stomp"
require "json/pure"
require "benchmark"
require "socket"
require "timeout"
# http://activemq.apache.org/statisticsplugin.html
# http://activemq.2283324.n4.nabble.com/statisticsBrokerPlugin-with-Stomp-td2353331.html
NUMERIC_TYPES = ['double', 'long', 'int']
ALLOWED_TYPES = NUMERIC_TYPES + ['string']
options = {
:port => 61613,
:host => nil,
:format => "json",
:types => [],
:receive_timeout => 5
}
OptionParser.new do |opts|
opts.banner = "Usage: #{$0} [options]"
opts.on("-H", "--host HOST", "Hostname") do |host|
options[:host] = host
end
opts.on("-f", "--format FORMAT", "Select format") do |format|
options[:format] = format
end
opts.on("-p", "--port PORT", "Port number") do |port|
options[:port] = port
end
opts.on("-k", "--key KEY", "Selected key") do |key|
options[:key] = key
end
opts.on("-T", "--time", "Get real enqueue time") do |time|
options[:time] = time
end
opts.on("-t", "--type TYPES", "Selected types") do |types|
options[:types] = types.split(',')
if options[:types].include?('numeric')
options[:types].delete('numeric')
options[:types] |= NUMERIC_TYPES
end
invalid_types = options[:types] - ALLOWED_TYPES
unless invalid_types.empty?
abort "Invalid type(s): #{invalid_types.join(', ')}\n" +
"Allowed: #{ALLOWED_TYPES.join(', ')}\n" +
"Virtual type `numeric` includes: #{NUMERIC_TYPES.join(', ')}"
end
end
opts.on("-q", "--queue QUEUE", "Queue name") do |queue|
options[:queue] = queue
end
# zabbix format specific options
opts.on("--zabbix-prefix PREFIX", "Select prefix") do |prefix|
options[:zabbix_prefix] = prefix
end
opts.on("--zabbix-host HOST", "Select host") do |host|
options[:zabbix_host] = host
end
opts.on("--receive-timeout SECONDS", "Receive timeout (0 - wait forever, default: #{options[:receive_timeout]})") do |seconds|
options[:receive_timeout] = Integer(seconds)
end
opts.on("-h", "--help", "Show this message") do
puts opts
exit
end
end.parse!
def jms_map_simplify(p, types = [])
a = p['map']['entry'].map do |entry|
keys = entry.keys
if keys.size == 1 && keys.include?('string')
if types.empty? || types.include?('string')
[entry['string'][0], entry['string'][1]]
end
elsif keys.size == 2 && keys.include?('string')
type = (keys - ['string']).first
if types.empty? || types.include?(type)
[entry['string'], entry[type]]
end
end
end.compact
Hash[a]
end
def silence_stream(stream)
old_stream = stream.dup
stream.reopen(RUBY_PLATFORM =~ /(:?mswin|mingw)/ ? 'NUL:' : '/dev/null')
stream.sync = true
yield
ensure
stream.reopen(old_stream)
end
connection_hash = {
:hosts => [
{:login => "guest", :passcode => "guest", :host => options[:host], :port => options[:port], :ssl => false}
],
:initial_reconnect_delay => 2,
:max_reconnect_attempts => 2
}
conn = nil
begin
silence_stream(STDERR) do
conn = Stomp::Connection.open(connection_hash)
end
rescue
puts '-1'
exit
end
body = nil
total_time = Benchmark.realtime do
if options[:queue]
conn.subscribe("/queue/STATS.#{options[:queue]}", {"transformation" => "jms-map-json"})
conn.publish("/queue/ActiveMQ.Statistics.Destination.#{options[:queue]}", "", {"reply-to" => "/queue/STATS.#{options[:queue]}"})
else
conn.subscribe("/queue/STATS", {"transformation" => "jms-map-json"})
conn.publish("/queue/ActiveMQ.Statistics.Broker", "", {"reply-to" => "/queue/STATS"})
end
begin
Timeout.timeout(options[:receive_timeout]) do
body = conn.receive.body
end
rescue Timeout::Error
puts '-1'
exit
end
conn.disconnect
end
if options[:time]
puts total_time
exit
end
if options[:format] == "raw"
puts body
exit
end
if options[:format] == "zabbix"
hash = jms_map_simplify(JSON.parse(body), options[:types])
options[:zabbix_host] ||= Socket.gethostname
options[:zabbix_prefix] ||= ""
if options[:key].nil?
hash.each do |key, value|
puts "#{options[:zabbix_host]} #{options[:zabbix_prefix]}#{key} #{value}"
end
else
hash.select {|n, v| n == options[:key] }.each do |key, value|
puts "#{options[:zabbix_host]} #{options[:zabbix_prefix]}#{key} #{value}"
end
end
exit
end
if options[:format] == "json"
hash = jms_map_simplify(JSON.parse(body), options[:types])
puts options[:key].nil? ? hash.to_json : hash[options[:key]]
exit
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment