Skip to content

Instantly share code, notes, and snippets.

@yaauie
Last active February 9, 2016 06:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaauie/40a9176f354f0100db47 to your computer and use it in GitHub Desktop.
Save yaauie/40a9176f354f0100db47 to your computer and use it in GitHub Desktop.
put this in the bin of the kafka tarball you downloaded from https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz and invoke it with the path to the zookeeper node as its only argument.
#!/usr/bin/env ruby
require 'shellwords'
END {
zookeeper = ARGV.shift || fail("Usage #{$0} <zookeeper>")
$stderr.puts("Getting stats via #{zookeeper}")
snap1 = with_progress('capturing first snapshot') { capture_snapshot(zookeeper) }
countdown('waiting 10 minutes for second snapshot', 600)
snap2 = with_progress('capturing second snapshot') { capture_snapshot(zookeeper) }
(snap1.keys|snap2.keys).map do |key|
[key, snap1[key],snap2[key]]
end.map do |key, a,b|
compare(key, a, b, $stdout)
end.compact
lag_a, lag_b = [snap1.values,snap2.values].map do |snap|
lag_sum = snap.reduce(0) { |memo, item| memo + item.fetch(:Lag,0) }
time = snap.first[:Time]
{Lag: lag_sum, Time: time}
end
compare('TOTAL LAG', lag_a, lag_b, $stderr)
} if $0 == __FILE__
def compare(key, a, b, io=$stdout)
io.write("#{key}: #{(b && b[:Lag])|| '---'} ")
if a.nil?
io.puts("(added)")
elsif b.nil?
io.puts("(removed)")
elsif b[:Lag] < 100
io.puts("(ok)")
else
d_lag = b[:Lag]-a[:Lag];
d_time = b[:Time]-a[:Time];
g_rate_per_minute = ((d_lag.to_f/d_time.to_f)*60).to_i;
if d_lag == 0 || g_rate_per_minute == 0
io.puts("(constant)")
elsif d_lag > 0
io.puts("(growing at #{g_rate_per_minute}/min)")
else
minutes_to_catchup = -1 * (b[:Lag]/g_rate_per_minute);
h,m = minutes_to_catchup.divmod(60);
io.puts("(catching up at #{g_rate_per_minute.abs}/min; eta: #{sprintf('%02d:%02d',h,m)})")
end
end
end
def with_progress(intro, io = $stderr)
io.write("#{intro}...")
io.flush
th = Thread.new do
loop do
io.write(?.)
io.flush
sleep 1
end
end
return yield
ensure
th.kill
io.write("\r\033[J")
io.flush
end
def countdown(intro, duration, io=$stderr)
th = Thread.new do
io.write("#{intro}: ")
io.flush
target_time = Time.now + duration
loop do
begin
remaining = (target_time - Time.now).to_i
rh,rm = remaining.divmod(60)
io.write(sprintf("%02d:%02d", rh, rm))
io.flush
sleep 1
io.write("\033[5D")
io.flush
end
end
end
sleep duration
ensure
th.kill
io.write("\r\033[J")
io.flush
end
def capture_snapshot(zookeeper)
offset_checker = File.expand_path('kafka-consumer-offset-checker.sh', (ENV['KAFKA_BIN']||File.dirname(__FILE__)))
unless File.exists?(offset_checker) && File.executable?(offset_checker)
fail("#{File.basename(offset_checker)} not found at #{File.dirname(offset_checker)}")
end
time = Time.now
raw_result = `#{offset_checker.shellescape} --group phoenix_consumer_2016_02_07 --topic dryad_persist_1 --zookeeper #{zookeeper.shellescape} 2>/dev/null`
fail(raw_result) if $?.exitstatus > 0
result = raw_result.lines.map(&:chomp).map(&:split)
headers = result.shift.map(&:to_sym)
result.each_with_object({}) do |item, m0|
hash = headers.zip(item).each_with_object({Time: time}) do |(header, value), m1|
value = Integer(value) if value.match(/\A[0-9]+\Z/)
m1[header] = value
end
m0[hash[:Owner]] = hash
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment