All these are already installed on epyc.
-
kafkacat
(conda install -c conda-forge kafkacat
) -
kt
(grab it from https://github.com/fgeller/kt/releases) -
kafka-*
(come with kafka, if youyum install
if from Confluent's repo, or via Docker if you're so inclined). Warning -- JVM based and dreadfully slow. -
jq
(conda install -c conda-forge jq
or use your favorite package manager)
# List all available topics
kafkacat -b localhost -L | grep topic
# Count the number of packets in a topic, by streaming it:
kafkacat -b localhost -t ztf_20180519_programid2 -e -o beginning -f 'Topic %t [%p] at offset %o\n' | wc
# get list of topics & offsets. the output are tab-separated columns <topic_name> <partition_id> <offs_earliest> <offs_latest>
kt topic -partitions -filter 'ztf_.*' | jq -r '.name as $name | .partitions[] | [$name, .id, .oldest, .newest] | @tsv' | sort
# get the number of alerts in all topics (see [here](https://github.com/dirac-institute/zads-terraform/blob/master/provisioning/broker/config/zads-delete-expired-topics#L11) for explanation on what this does)
kt topic -partitions -filter 'ztf_.*' | jq -r '.name as $name | .partitions[] | [$name, .id, .oldest, .newest] | @tsv' | awk '{a[$1]+=$4-$3} END {for(i in a) print i"\t"a[i]}' | sort
# last message offsets, using JVM-based tools (slow)
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ztf_20180516_programid1 --time -1
# first message offsets, using JVM-based tools (slow)
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ztf_20180516_programid1 --time -2
# Subtract the results of last - first to get the number of messages in the topic.
Obscure stuff:
# check if compression is working (https://stackoverflow.com/questions/36590471/how-can-i-verify-if-compression-is-working-correctly-in-kafka-0-8-2-2)
kafka-run-class kafka.tools.DumpLogSegments --files /epyc/projects/ztf-alerts/kafka/ztf-kafka-data/broker1/ztf_20180516_programid1-13/00000000000000000000.log --print-data-log | less
# list all consumer groups (JVM-based tools; sometimes time out, too slow for repeated calls in scripting)
kafka-consumer-groups --bootstrap-server localhost:9092 --list
# get details on offsets for a consumer group (JVM-based tools; sometimes time out, too slow for repeated calls in scripting)
kafka-consumer-groups --bootstrap-server localhost:9092 --group uwm-ztf --describe
# get list of offsets for all consumer groups
kt group | jq -r '.name as $name | .topic as $topic | .offsets[] | [$name, $topic, .partition, .offset, .lag] | @tsv'
# limit to some groupIDs and topics
kt group -brokers localhost:9092 -filter 'zads-mirror|uwm.*' -topic ztf_20180524_programid1 | jq -r '.name as $name | .topic as $topic | .offsets[] | [$name, $topic, .partition, .offset, .lag] | @tsv'
# reset offsets (*** DONT RUN THIS UNLESS YOU KNOW WHAT YOU'RE DOING***)
kafka-consumer-groups --bootstrap-server localhost:9092 --group zads-mirror --reset-offsets --to-earliest --all-topics --execute
# Creating a topic (with 1yr retention time, 14 partitions, named ztf_test)
kafka-topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 14 --topic ztf_test --config retention.ms=31536000000
# copying ten (10) _text_ messages from one topic onto another
kafkacat -C -b localhost -t ztf_20180531_programid1 -c 10 -e | kafkacat -P -b localhost -t ztf_test
# copying binary meesages from one topic onto another
See https://github.com/dirac-institute/zads-terraform/blob/master/utils/kafka-cp-topic
# Purging a topic (by lowering retention time)
## Lower the retention time
kafka-topics --zookeeper localhost:2181 --alter --topic ztf_test --config retention.ms=1000
## wait until kafka does cleanup...
## Return to old retention time
kafka-topics --zookeeper localhost:2181 --alter --topic ztf_test --config retention.ms=31536000000
# deleting a topic:
kafka-topics --zookeeper localhost:2181 --delete --topic ztf_test