Skip to content

Instantly share code, notes, and snippets.

@mjuric
Last active September 6, 2023 03:59
Show Gist options
  • Star 19 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save mjuric/a1cdd53fa02d67d4da38681d52635aec to your computer and use it in GitHub Desktop.
Save mjuric/a1cdd53fa02d67d4da38681d52635aec to your computer and use it in GitHub Desktop.
Useful Kafka wrangling commands

Utilities you'll care about

All these are already installed on epyc.

  • kafkacat (conda install -c conda-forge kafkacat)

  • kt (grab it from https://github.com/fgeller/kt/releases)

  • kafka-* (come with kafka, if you yum install if from Confluent's repo, or via Docker if you're so inclined). Warning -- JVM based and dreadfully slow.

  • jq (conda install -c conda-forge jq or use your favorite package manager)

Useful commands

Topic querying / management

# List all available topics
kafkacat -b localhost -L | grep topic

# Count the number of packets in a topic, by streaming it:
kafkacat -b localhost -t ztf_20180519_programid2 -e -o beginning -f 'Topic %t [%p] at offset %o\n' | wc

# get list of topics & offsets. the output are tab-separated columns <topic_name> <partition_id> <offs_earliest> <offs_latest>
kt topic -partitions -filter 'ztf_.*' | jq -r '.name as $name | .partitions[] | [$name, .id, .oldest, .newest] | @tsv' | sort

# get the number of alerts in all topics (see [here](https://github.com/dirac-institute/zads-terraform/blob/master/provisioning/broker/config/zads-delete-expired-topics#L11) for explanation on what this does)
kt topic -partitions -filter 'ztf_.*' | jq -r '.name as $name | .partitions[] | [$name, .id, .oldest, .newest] | @tsv' | awk '{a[$1]+=$4-$3} END {for(i in a) print i"\t"a[i]}' | sort

# last message offsets, using JVM-based tools (slow)
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ztf_20180516_programid1 --time -1
# first message offsets, using JVM-based tools (slow)
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ztf_20180516_programid1 --time -2
# Subtract the results of last - first to get the number of messages in the topic.

Obscure stuff:

# check if compression is working (https://stackoverflow.com/questions/36590471/how-can-i-verify-if-compression-is-working-correctly-in-kafka-0-8-2-2)
kafka-run-class kafka.tools.DumpLogSegments --files /epyc/projects/ztf-alerts/kafka/ztf-kafka-data/broker1/ztf_20180516_programid1-13/00000000000000000000.log --print-data-log | less

Consumer group offsets / management

# list all consumer groups (JVM-based tools; sometimes time out, too slow for repeated calls in scripting)
kafka-consumer-groups --bootstrap-server localhost:9092 --list
# get details on offsets for a consumer group (JVM-based tools; sometimes time out, too slow for repeated calls in scripting)
kafka-consumer-groups --bootstrap-server localhost:9092 --group uwm-ztf --describe

# get list of offsets for all consumer groups
kt group | jq -r '.name as $name | .topic as $topic | .offsets[] | [$name, $topic, .partition, .offset, .lag] | @tsv'
# limit to some groupIDs and topics
kt group -brokers localhost:9092 -filter 'zads-mirror|uwm.*' -topic ztf_20180524_programid1 | jq -r '.name as $name | .topic as $topic | .offsets[] | [$name, $topic, .partition, .offset, .lag] | @tsv'

Resetting consumer group offsets

# reset offsets (*** DONT RUN THIS UNLESS YOU KNOW WHAT YOU'RE DOING***)
kafka-consumer-groups --bootstrap-server localhost:9092 --group zads-mirror --reset-offsets --to-earliest --all-topics --execute

Topic creation / deletion / modification

# Creating a topic (with 1yr retention time, 14 partitions, named ztf_test)
kafka-topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 14 --topic ztf_test --config retention.ms=31536000000

# copying ten (10) _text_ messages from one topic onto another
kafkacat -C -b localhost -t ztf_20180531_programid1 -c 10 -e | kafkacat -P -b localhost -t ztf_test

# copying binary meesages from one topic onto another
See https://github.com/dirac-institute/zads-terraform/blob/master/utils/kafka-cp-topic

# Purging a topic (by lowering retention time)
## Lower the retention time
kafka-topics --zookeeper localhost:2181 --alter --topic ztf_test --config retention.ms=1000
## wait until kafka does cleanup...
## Return to old retention time
kafka-topics --zookeeper localhost:2181 --alter --topic ztf_test --config retention.ms=31536000000

# deleting a topic:
kafka-topics --zookeeper localhost:2181 --delete --topic ztf_test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment