Skip to content

Instantly share code, notes, and snippets.

@gyk
Created December 9, 2016 11:46
Show Gist options
  • Save gyk/26062b4879a6ca0c35ebcc514686254d to your computer and use it in GitHub Desktop.
Save gyk/26062b4879a6ca0c35ebcc514686254d to your computer and use it in GitHub Desktop.
Measure Kafka consumer group throughput via Burrow API
"Measure Kafka consumer group throughput via Burrow API"
import json
import urllib2
import time
import sys
import argparse
BURROW_CONSUMER_GROUP_TEMPLATE = 'http://{host}:{port}/v2/kafka/DC/consumer/{consumer}/topic/{topic}'
BURROW_TOPIC_TEMPLATE = 'http://{host}:{port}/v2/kafka/DC/topic/{topic}'
def mean(a):
return sum(a) / len(a)
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--help', action='help', help="show this help message")
parser.add_argument('--consumer', '-c', type=str, help="Consumer group id", required=True)
parser.add_argument('--topic', '-t', type=str, help="Topic")
parser.add_argument('--host', '-h', type=str, help="Host")
parser.add_argument('--port', '-p', type=int, help="Port")
parser.add_argument('--interval', '-i', type=int, help="Interval in seconds (should not be too small)")
parser.set_defaults(topic='log')
parser.set_defaults(host='127.0.0.1')
parser.set_defaults(port='8000')
parser.set_defaults(interval=30)
if __name__ == '__main__':
args = parser.parse_args()
arg_dict = vars(args)
last_topic_offset = 0
last_time = 0.0
while True:
topic_response = json.load(urllib2.urlopen(BURROW_TOPIC_TEMPLATE.format(**arg_dict)))
consumer_group_response = json.load(urllib2.urlopen(BURROW_CONSUMER_GROUP_TEMPLATE.format(**arg_dict)))
now = time.time()
if not consumer_group_response.has_key('offsets'):
print "Consumer group not ready."
time.sleep(10)
continue
consumer_group_offsets = consumer_group_response['offsets']
# print "Average offset: ", mean(consumer_group_offsets)
topic_offsets = topic_response['offsets']
topic_total_offset = sum(topic_offsets)
consumer_group_total_offset = sum(consumer_group_offsets)
print "Lag: ", topic_total_offset - consumer_group_total_offset
print "Throughput: ", (consumer_group_total_offset - last_topic_offset) / (now - last_time), " events/s"
last_topic_offset = consumer_group_total_offset
last_time = now
print
time.sleep(args.interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment