Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Useful Kafka wrangling commands

Utilities you'll care about

All these are already installed on epyc.

  • kafkacat (conda install -c conda-forge kafkacat)

  • kt (grab it from https://github.com/fgeller/kt/releases)

  • kafka-* (come with kafka, if you yum install if from Confluent's repo, or via Docker if you're so inclined). Warning -- JVM based and dreadfully slow.

  • jq (conda install -c conda-forge jq or use your favorite package manager)

Useful commands

Topic querying / management

# List all available topics
kafkacat -b localhost -L | grep topic

# Count the number of packets in a topic, by streaming it:
kafkacat -b localhost -t ztf_20180519_programid2 -e -o beginning -f 'Topic %t [%p] at offset %o\n' | wc

# get list of topics & offsets. the output are tab-separated columns <topic_name> <partition_id> <offs_earliest> <offs_latest>
kt topic -partitions -filter 'ztf_.*' | jq -r '.name as $name | .partitions[] | [$name, .id, .oldest, .newest] | @tsv' | sort

# get the number of alerts in all topics (see [here](https://github.com/dirac-institute/zads-terraform/blob/master/provisioning/broker/config/zads-delete-expired-topics#L11) for explanation on what this does)
kt topic -partitions -filter 'ztf_.*' | jq -r '.name as $name | .partitions[] | [$name, .id, .oldest, .newest] | @tsv' | awk '{a[$1]+=$4-$3} END {for(i in a) print i"\t"a[i]}' | sort

# last message offsets, using JVM-based tools (slow)
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ztf_20180516_programid1 --time -1
# first message offsets, using JVM-based tools (slow)
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ztf_20180516_programid1 --time -2
# Subtract the results of last - first to get the number of messages in the topic.

Obscure stuff:

# check if compression is working (https://stackoverflow.com/questions/36590471/how-can-i-verify-if-compression-is-working-correctly-in-kafka-0-8-2-2)
kafka-run-class kafka.tools.DumpLogSegments --files /epyc/projects/ztf-alerts/kafka/ztf-kafka-data/broker1/ztf_20180516_programid1-13/00000000000000000000.log --print-data-log | less

Consumer group offsets / management

# list all consumer groups (JVM-based tools; sometimes time out, too slow for repeated calls in scripting)
kafka-consumer-groups --bootstrap-server localhost:9092 --list
# get details on offsets for a consumer group (JVM-based tools; sometimes time out, too slow for repeated calls in scripting)
kafka-consumer-groups --bootstrap-server localhost:9092 --group uwm-ztf --describe

# get list of offsets for all consumer groups
kt group | jq -r '.name as $name | .topic as $topic | .offsets[] | [$name, $topic, .partition, .offset, .lag] | @tsv'
# limit to some groupIDs and topics
kt group -brokers localhost:9092 -filter 'zads-mirror|uwm.*' -topic ztf_20180524_programid1 | jq -r '.name as $name | .topic as $topic | .offsets[] | [$name, $topic, .partition, .offset, .lag] | @tsv'

Resetting consumer group offsets

# reset offsets (*** DONT RUN THIS UNLESS YOU KNOW WHAT YOU'RE DOING***)
kafka-consumer-groups --bootstrap-server localhost:9092 --group zads-mirror --reset-offsets --to-earliest --all-topics --execute

Topic creation / deletion / modification

# Creating a topic (with 1yr retention time, 14 partitions, named ztf_test)
kafka-topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 14 --topic ztf_test --config retention.ms=31536000000

# copying ten (10) _text_ messages from one topic onto another
kafkacat -C -b localhost -t ztf_20180531_programid1 -c 10 -e | kafkacat -P -b localhost -t ztf_test

# copying binary meesages from one topic onto another
See https://github.com/dirac-institute/zads-terraform/blob/master/utils/kafka-cp-topic

# Purging a topic (by lowering retention time)
## Lower the retention time
kafka-topics --zookeeper localhost:2181 --alter --topic ztf_test --config retention.ms=1000
## wait until kafka does cleanup...
## Return to old retention time
kafka-topics --zookeeper localhost:2181 --alter --topic ztf_test --config retention.ms=31536000000

# deleting a topic:
kafka-topics --zookeeper localhost:2181 --delete --topic ztf_test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment