Skip to content

Instantly share code, notes, and snippets.

@padilo
Created July 3, 2019 13:22
Show Gist options
  • Save padilo/6a5101f9e005c975956208cb19ba9f67 to your computer and use it in GitHub Desktop.
Save padilo/6a5101f9e005c975956208cb19ba9f67 to your computer and use it in GitHub Desktop.
Script to get from Kakfa the list of topics consumed by each consumer group
#!/bin/bash
if [ $# -ne 2 ]; then
>&2 echo "usage: $0 <kafka_binary_folder> <bootstrap.server>"
>&2 echo
>&2 echo "It outputs the list of topics consumed by consumergroup as csv."
>&2 echo "With the following structure:"
>&2 echo " <consumer>,<topic>"
exit 1
fi
kafka_path=$1/bin
bootstrap=$2
tmp_dir=$(mktemp -d "${TMPDIR:-/tmp/}$(basename $0).pid.XXXXXXXXXXXX")
consumer_group=consumer-reader-$RANDOM
pid_file=$tmp_dir/pid
function launch_consumer {
(${kafka_path}/kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server ${bootstrap} --topic __consumer_offsets --from-beginning --consumer-property group.id=${consumer_group} & echo $! >&3 ) 3>${pid_file} | cut -f1 -d":" | tr -d "[]" | cut -f1,2 -d"," | sort -u &
}
function stop_consumer {
kill $(<$pid_file)
>&2 echo "Finishing..."
sleep 10
}
function get_lag {
lag=$( ${kafka_path}/kafka-consumer-groups.sh --bootstrap-server ${bootstrap} --offsets --describe --group ${consumer_group} | tr -s ' ' ',' | cut -f5 -d, | tail -n +3 | awk '{s+=$1} END {print s}')
}
function print_progress {
>&2 echo $(echo "100 - 100 * $lag / $first_lag" | bc)%
}
launch_consumer
>&2 echo "Waiting a bit until consumer starts..."
sleep 20
>&2 echo "Reading..."
get_lag
first_lag=$lag
while [ $lag -gt "1000" ]; do
sleep 5
get_lag
print_progress
done
>&2 echo "Waiting extra time to consume last offsets..."
sleep 10
stop_consumer
rm -rf $tmp_dir
>&2 echo "Done"
@padilo
Copy link
Author

padilo commented Jul 3, 2019

This script has to read the whole __consumer_offsets topic, so it may take long.
It will output the result to stdout.

Example:
kafka-consumer-by-topic.sh ~/apps/kafka_2.12-2.0.1 localhost:9092 > output.csv

@sharathkumar13
Copy link

sharathkumar13 commented Mar 18, 2020

Thank You. This worked well. Helped a lot. :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment