Skip to content

Instantly share code, notes, and snippets.

@killme2008
Created March 17, 2013 06:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save killme2008/5180447 to your computer and use it in GitHub Desktop.
Save killme2008/5180447 to your computer and use it in GitHub Desktop.
Query queue length in metaq.
####
# Description:a ruby script to monitor metaq queue size
# Requirements: zookeeper
# sudo gem install zookeeper
#
#####
require 'rubygems'
require 'zookeeper'
require 'socket'
class Partition
attr_accessor :broker_id,:partition
def initialize(broker_id, partition)
@broker_id = broker_id
@partition = partition
end
def to_s
return "#{@broker_id}-#{@partition}"
end
def inspect
return to_s
end
def <=>(other)
if @broker_id != other.broker_id
return @broker_id <=> other.broker_id
else
return @partition <=> other.partition
end
end
end
class MetaQOffsetsQuery
def initialize(zk_servers, metaq_server="localhost:8123", zk_root="/avos-fetch-meta", topic="avos-fetch-tasks", group="avos-fetcher")
@zk = Zookeeper.new(zk_servers)
@metaq_server = metaq_server
@topic = topic
@zk_root = zk_root
@group = group
@broker_topic_path = "#{@zk_root}/brokers/topics"
@consumer_offset_path = "#{@zk_root}/consumers/#{@group}/offsets/#{@topic}"
end
def run
broker_id = get_broker_id
parts = get_parts(broker_id)
count = 0
parts.each do |part|
offset_in_zk = get_zk_offset(part)
offset_in_broker = get_broker_offset(part)
puts "#{part}:#{offset_in_broker-offset_in_zk}"
end
return count
end
def get_zk_offset(part)
_,offset= safe_zk_get("#{@consumer_offset_path}/#{part}")[:data].split "-"
return offset.to_i
end
def get_broker_offset(part)
#line: avos-fetch-tasks part 6 min_offset 0 max_offset 72492782
lines = stats_broker("offsets")
pat = Regexp.new("#{@topic} part #{part.partition} min_offset [0-9]+ max_offset ([0-9]+)\r\n")
lines.each do |line|
return $1.to_i if line =~ pat
end
end
def safe_zk_get(path, count=0)
begin
return @zk.get({ :path => path })
rescue Exception => e
if count >= 3
raise e
else
safe_zk_close()
@zk = Zookeeper.new(@zk_servers)
return safe_zk_get(path, count.succ)
end
end
end
def get_parts(broker_id)
n_parts = safe_zk_get("#{@broker_topic_path}/#{@topic}/#{broker_id}-m")[:data].to_i
(0..n_parts-1).collect do | n |
Partition.new(broker_id, n)
end
end
def get_broker_id
lines = stats_broker
lines.each do |line|
if line =~ /broker_id (\d+)\r\n/
return $1.to_i
end
end
end
def stats_broker(item="")
host,port = @metaq_server.split ":"
socket = TCPSocket.open(host,port.to_i)
socket.write "stats #{item}\r\n"
socket.flush
lines = []
line = socket.readline
while line != "END\r\n"
line = socket.readline
lines << line
end
socket.close
return lines
end
end
if __FILE__ == $0
begin
brokers = ["localhost:8123"]
brokers.each do | broker|
query = MetaQOffsetsQuery.new("localhost:2181",broker,zk_root="/avos-fetch-meta")
query.run
end
rescue => e
puts "#{e.backtrace.join('\n')} #{e.message}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment