Skip to content

Instantly share code, notes, and snippets.

@tolitius
Last active March 17, 2020 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tolitius/d79d5ec467c1c7effb45fef3867cdddc to your computer and use it in GitHub Desktop.
Save tolitius/d79d5ec467c1c7effb45fef3867cdddc to your computer and use it in GitHub Desktop.
kafka: tail last N messages

expects

  • kafka installed
  • zookeeper hostname
  • jq installed
#!/bin/bash
## adopted from https://gist.github.com/forestjohnsonpeoplenet/d56e6f6a0d1070bcd74362a76c9dafec
## made to work for kafka 2.4.1 (+ ?)
## usage:
## ./ktail.sh localhost:2181 jobs 10
if [ -z "$1" ]
then
echo "missing first argument, zookeeper host port like: zookeeper-hostname:2181"
echo "usage example: $0 zookeeper-hostname:2181 topic-name 10 America/Chicago"
exit 1
fi
if [ -z "$2" ]
then
echo "missing second argument, topic name like: topic-name"
echo "usage example: $0 zookeeper-hostname:2181 topic-name 10 America/Chicago"
exit 1
fi
if [ -z "$3" ]
then
echo "missing third argument, number of messages to consume from each partition"
echo "usage example: $0 zookeeper-hostname:2181 topic-name 10 America/Chicago"
exit 1
fi
ZK=$1
TOPIC=$2
READ_LAST_COUNT=$3
TZ=$4
RAW=$5
function get_broker_list () {
ZK="$1"
GET_BROKER_METADATA_COMMANDS=$(/opt/kafka/bin/zookeeper-shell.sh "$ZK" <<< "ls /brokers/ids" | grep '^\[' | jq -r .[] | sed 's/\([0-9][0-9]*\)/ get \/brokers\/ids\/\1 /g')
BROKER_LIST=''
while read -r GET_BROKER_METADATA_COMMAND; do
BROKER_JSON=$(/opt/kafka/bin/zookeeper-shell.sh "$ZK" <<< "$GET_BROKER_METADATA_COMMAND" 2>/dev/null | grep '"host":' )
BROKER_HOST=$(echo "$BROKER_JSON" | jq -r .host)
BROKER_PORT=$(echo "$BROKER_JSON" | jq -r .port)
BROKER_LIST="$BROKER_LIST$BROKER_HOST:$BROKER_PORT,"
done <<< "$GET_BROKER_METADATA_COMMANDS"
echo "$BROKER_LIST" | sed 's/,$//g'
}
BROKER_LIST=$(get_broker_list "$ZK")
ONE_BROKER=$(echo "$BROKER_LIST" | tr ',' '\n' | tail -n 1)
PARTITION_OFFSET_START_LIST=$(/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$BROKER_LIST" --time -2 --topic "$TOPIC" | sort)
PARTITION_OFFSET_END_LIST=$(/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$BROKER_LIST" --time -1 --topic "$TOPIC" | sort)
PARTITION_COUNT=$(echo "$PARTITION_OFFSET_START_LIST" | wc -l)
for LINE_NUMBER in `seq 1 $PARTITION_COUNT`;
do
OFFSET_START=$(echo "$PARTITION_OFFSET_START_LIST" | head -n $LINE_NUMBER | tail -n -1 | sed 's/\([^:][^:]*\):\([0-9]*\):\([0-9]*\)/\3/g')
OFFSET_END=$(echo "$PARTITION_OFFSET_END_LIST" | head -n $LINE_NUMBER | tail -n -1 | sed 's/\([^:][^:]*\):\([0-9]*\):\([0-9]*\)/\3/g')
PARTITION_ID=$(expr $LINE_NUMBER - 1)
MAX_MESSAGES=$(expr $OFFSET_END - $OFFSET_START)
if [ "$MAX_MESSAGES" -gt "$READ_LAST_COUNT" ]; then
MAX_MESSAGES="$READ_LAST_COUNT"
OFFSET_START=$(expr $OFFSET_END - $MAX_MESSAGES)
fi
PROPERTIES="--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer "
if [ "$RAW" != "raw" ]; then
echo "$TOPIC partition $PARTITION_ID last $MAX_MESSAGES messages:"
PROPERTIES="--property print.timestamp=true $PROPERTIES"
fi
MESSAGES_WITH_TIMESTAMPS=$(/opt/kafka/bin/kafka-console-consumer.sh $PROPERTIES --bootstrap-server "$BROKER_LIST" --topic "$TOPIC" --offset $OFFSET_START --max-messages $MAX_MESSAGES --partition $PARTITION_ID )
LINE_COUNT=$(echo "$MESSAGES_WITH_TIMESTAMPS" | wc -l)
for LINE_NUMBER in `seq 1 $LINE_COUNT`;
do
LINE=$(echo "$MESSAGES_WITH_TIMESTAMPS" | head -n $LINE_NUMBER | tail -n -1)
TIMESTAMP=$(echo "$LINE" | sed -ne 's/CreateTime:\([0-9]*\)\t.*/\1/p')
BYTES=$(echo "$LINE" | sed -ne 's/CreateTime:[0-9]*\t\(.*\)/\1/p' | sed 's/\\x[A-F0-9]\{2\}/ /g')
if [ ! -z "$TIMESTAMP" ]; then
DATETIME_READABLE=$(TZ="$TZ" date -d@"$(expr $TIMESTAMP / 1000)")
echo "$DATETIME_READABLE $BYTES"
else
echo "$LINE"
fi
done
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment