Skip to content

Instantly share code, notes, and snippets.

@killme2008
Last active December 11, 2015 02:09
Show Gist options
  • Save killme2008/4528938 to your computer and use it in GitHub Desktop.
Save killme2008/4528938 to your computer and use it in GitHub Desktop.
A script to monitor MetaQ queue size
####
# Description:a ruby script to monitor metaq queue size
# Requirements: zookeeper
# sudo gem install zookeeper
#
#####
require 'rubygems'
require 'zookeeper'
require 'socket'
class Partition
attr_accessor :broker_id,:partition
def initialize(broker_id, partition)
@broker_id = broker_id
@partition = partition
end
def to_s
return "#{@broker_id}-#{@partition}"
end
def inspect
return to_s
end
def <=>(other)
if @broker_id != other.broker_id
return @broker_id <=> other.broker_id
else
return @partition <=> other.partition
end
end
end
class MetaQMonitor
def initialize(zk_servers, metaq_server="localhost:8123", zk_root="/avos-fetch-meta", topic="avos-fetch-tasks", group="avos-fetcher" ,warn_threshold = 512*1024 , critical_threshold = 5*1024*1024)
@zk = Zookeeper.new(zk_servers)
@metaq_server = metaq_server
@topic = topic
@zk_root = zk_root
@group = group
@warn_threshold = warn_threshold
@critical_threshold = critical_threshold
@broker_topic_path = "#{@zk_root}/brokers/topics"
@consumer_offset_path = "#{@zk_root}/consumers/#{@group}/offsets/#{@topic}"
end
def run
broker_id = get_broker_id
parts = get_parts(broker_id)
criticals = []
warns = []
parts.each do |part|
offset_in_zk = get_zk_offset(part)
offset_in_broker = get_broker_offset(part)
diff = offset_in_broker - offset_in_zk
if diff > @critical_threshold
criticals << "Partition #{part} backlog too many messages:#{diff} Bytes (critical)"
elsif diff > @warn_threshold
warns << "Partition #{part} backlog too many messages:#{diff} Bytes (warn)"
end
end
exit_code = if criticals.size >0 then 2 elsif warns.size >0 then 1 else 0 end
criticals.each{|c| puts c}
warns.each{|w| puts w}
exit_code
end
def get_zk_offset(part)
_,offset= safe_zk_get("#{@consumer_offset_path}/#{part}")[:data].split "-"
return offset.to_i
end
def get_broker_offset(part)
#line: avos-fetch-tasks part 6 min_offset 0 max_offset 72492782
lines = stats_broker("offsets")
pat = Regexp.new("#{@topic} part #{part.partition} min_offset [0-9]+ max_offset ([0-9]+)\r\n")
lines.each do |line|
return $1.to_i if line =~ pat
end
end
def safe_zk_get(path, count=0)
begin
return @zk.get({ :path => path })
rescue Exception => e
if count >= 3
raise e
else
safe_zk_close()
@zk = Zookeeper.new(@zk_servers)
return safe_zk_get(path, count.succ)
end
end
end
def get_parts(broker_id)
n_parts = safe_zk_get("#{@broker_topic_path}/#{@topic}/#{broker_id}-m")[:data].to_i
(0..n_parts-1).collect do | n |
Partition.new(broker_id, n)
end
end
def get_broker_id
lines = stats_broker
lines.each do |line|
if line =~ /broker_id (\d+)\r\n/
return $1.to_i
end
end
end
def stats_broker(item="")
host,port = @metaq_server.split ":"
socket = TCPSocket.open(host,port.to_i)
socket.write "stats #{item}\r\n"
socket.flush
lines = []
line = socket.readline
while line != "END\r\n"
line = socket.readline
lines << line
end
socket.close
return lines
end
end
if __FILE__ == $0
begin
monitor = MetaQMonitor.new("zk1:2181,zk2:2181,zk3:2181")
exit monitor.run
rescue => e
puts "#{e.backtrace.join('\n')} #{e.message}"
#unknown for nagios
exit 3
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment