-
-
Save yaauie/40a9176f354f0100db47 to your computer and use it in GitHub Desktop.
put this in the bin of the kafka tarball you downloaded from https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz and invoke it with the path to the zookeeper node as its only argument.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'shellwords' | |
END { | |
zookeeper = ARGV.shift || fail("Usage #{$0} <zookeeper>") | |
$stderr.puts("Getting stats via #{zookeeper}") | |
snap1 = with_progress('capturing first snapshot') { capture_snapshot(zookeeper) } | |
countdown('waiting 10 minutes for second snapshot', 600) | |
snap2 = with_progress('capturing second snapshot') { capture_snapshot(zookeeper) } | |
(snap1.keys|snap2.keys).map do |key| | |
[key, snap1[key],snap2[key]] | |
end.map do |key, a,b| | |
compare(key, a, b, $stdout) | |
end.compact | |
lag_a, lag_b = [snap1.values,snap2.values].map do |snap| | |
lag_sum = snap.reduce(0) { |memo, item| memo + item.fetch(:Lag,0) } | |
time = snap.first[:Time] | |
{Lag: lag_sum, Time: time} | |
end | |
compare('TOTAL LAG', lag_a, lag_b, $stderr) | |
} if $0 == __FILE__ | |
def compare(key, a, b, io=$stdout) | |
io.write("#{key}: #{(b && b[:Lag])|| '---'} ") | |
if a.nil? | |
io.puts("(added)") | |
elsif b.nil? | |
io.puts("(removed)") | |
elsif b[:Lag] < 100 | |
io.puts("(ok)") | |
else | |
d_lag = b[:Lag]-a[:Lag]; | |
d_time = b[:Time]-a[:Time]; | |
g_rate_per_minute = ((d_lag.to_f/d_time.to_f)*60).to_i; | |
if d_lag == 0 || g_rate_per_minute == 0 | |
io.puts("(constant)") | |
elsif d_lag > 0 | |
io.puts("(growing at #{g_rate_per_minute}/min)") | |
else | |
minutes_to_catchup = -1 * (b[:Lag]/g_rate_per_minute); | |
h,m = minutes_to_catchup.divmod(60); | |
io.puts("(catching up at #{g_rate_per_minute.abs}/min; eta: #{sprintf('%02d:%02d',h,m)})") | |
end | |
end | |
end | |
def with_progress(intro, io = $stderr) | |
io.write("#{intro}...") | |
io.flush | |
th = Thread.new do | |
loop do | |
io.write(?.) | |
io.flush | |
sleep 1 | |
end | |
end | |
return yield | |
ensure | |
th.kill | |
io.write("\r\033[J") | |
io.flush | |
end | |
def countdown(intro, duration, io=$stderr) | |
th = Thread.new do | |
io.write("#{intro}: ") | |
io.flush | |
target_time = Time.now + duration | |
loop do | |
begin | |
remaining = (target_time - Time.now).to_i | |
rh,rm = remaining.divmod(60) | |
io.write(sprintf("%02d:%02d", rh, rm)) | |
io.flush | |
sleep 1 | |
io.write("\033[5D") | |
io.flush | |
end | |
end | |
end | |
sleep duration | |
ensure | |
th.kill | |
io.write("\r\033[J") | |
io.flush | |
end | |
def capture_snapshot(zookeeper) | |
offset_checker = File.expand_path('kafka-consumer-offset-checker.sh', (ENV['KAFKA_BIN']||File.dirname(__FILE__))) | |
unless File.exists?(offset_checker) && File.executable?(offset_checker) | |
fail("#{File.basename(offset_checker)} not found at #{File.dirname(offset_checker)}") | |
end | |
time = Time.now | |
raw_result = `#{offset_checker.shellescape} --group phoenix_consumer_2016_02_07 --topic dryad_persist_1 --zookeeper #{zookeeper.shellescape} 2>/dev/null` | |
fail(raw_result) if $?.exitstatus > 0 | |
result = raw_result.lines.map(&:chomp).map(&:split) | |
headers = result.shift.map(&:to_sym) | |
result.each_with_object({}) do |item, m0| | |
hash = headers.zip(item).each_with_object({Time: time}) do |(header, value), m1| | |
value = Integer(value) if value.match(/\A[0-9]+\Z/) | |
m1[header] = value | |
end | |
m0[hash[:Owner]] = hash | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment