Skip to content

Instantly share code, notes, and snippets.

@bb01100100
Last active October 9, 2023 17:27
Show Gist options
  • Save bb01100100/e83e87a62dde305205475aab4f5597d5 to your computer and use it in GitHub Desktop.
Save bb01100100/e83e87a62dde305205475aab4f5597d5 to your computer and use it in GitHub Desktop.
Count Kafka topic messages using unix cli tools
#!/usr/bin/env bash
# Author: Kel Graham
# Date: 2019-12-04
# Purpose: Let's use Kafka CLI tools to get topic counts.
# This should be usable on any Kafka install since it uses
# only the kafka-consumer-groups utility and some standard
# unix tools to sum up.
PRAMS=""
while (( "$#" )); do
case "$1" in
-g|--group)
GROUP=$2
shift 2
;;
-t|--topic)
TOPIC=$2
shift 2
;;
-c|--config-file)
CONFIG_FILE=$2
shift 2
;;
-b|--bootstrap-server)
BOOTSTRAP_SERVER=$2
shift 2
;;
-h|--help)
echo ""
echo "Given a topic name, count the number of messages and provide a topic-level summary"
echo ""
echo "Usage: $0"
echo " -b | --boostrap-server <host:port>"
echo " -g | --group <name> # Use this group name when determining offsets (WARNING: resets offsets!)"
echo " -t | --topic <name> # Name of topic to count messages on"
echo " -c | --config-file <name> # Any auth-related config you need to pass on to the bootstrap server (SASL, RBAC, etc)"
echo ""
exit 1
;;
--) # end argument parsing
shift
break
;;
-*|--*=) # unsupported flags
echo "Error: Unsupported flag $1" >&2
exit 1
;;
*) # preserve positional arguments
PARAMS="$PARAMS $1"
shift
;;
esac
done
eval set -- "$PARAMS"
if [ -z "$BOOTSTRAP_SERVER" ] || [ -z "$TOPIC" ]; then
sh ./$0 -h
exit 1
fi
if [ -z "$GROUP" ]; then
GROUP=topic_counter.$$
fi
COMMAND_CONFIG=""
if [ -e "$CONFIG_FILE" ]; then
COMMAND_CONFIG=" --command-config $CONFIG_FILE "
fi
ARGS="${COMMAND_CONFIG} --bootstrap-server ${BOOTSTRAP_SERVER} --group ${GROUP} "
RES=$(kafka-consumer-groups ${ARGS} --topic ${TOPIC} --reset-offsets --to-earliest --execute)
if [ "$?" -eq 0 ]; then
# Lag is 5th column of output from kafka-consumer-groups --describe
# Since our consumer group has just been registerd and never consumed, it's lag is equal to the
# total number of messages on each topic partition. Sum 'em up to get total message count.
COUNT=$(kafka-consumer-groups ${ARGS} --describe 2>/dev/null| awk 'NR>1 {sum+=$5} END {print sum}')
echo "Topic $TOPIC has $COUNT messages"
else
echo "Had an error setting offsets: $RES"
fi
# Tidy up
DEL=$(kafka-consumer-groups ${ARGS} --delete 2>&1 > /dev/null)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment