Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@sonhmai
Last active March 10, 2024 19:48
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save sonhmai/5b2b4455162c808c091b661aeb675625 to your computer and use it in GitHub Desktop.
Save sonhmai/5b2b4455162c808c091b661aeb675625 to your computer and use it in GitHub Desktop.
kafka basic commands
# BENCHMARK-----------------------------------------------------
#1. Rust kafka-benchmark (https://github.com/fede1024/kafka-benchmark)
# must create topic with 6 partitions first
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6--topic [scenario]
# replace scenario with one in file kafka-benchmark/config/base_producer.yaml

Kafka Topics

List existing topics

bin/kafka-topics.sh --zookeeper localhost:2181 --list

Describe a topic

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic

Purge a topic

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

... wait a minute ...

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --delete-config retention.ms

Delete a topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytopic

Get number of messages in a topic ???

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'

Get the earliest offset still in a topic

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -2

Get the latest offset still in a topic

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1

Consume messages with the console consumer

bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mytopic --from-beginning

Get the consumer offsets for a topic

bin/kafka-consumer-offset-checker.sh --zookeeper=localhost:2181 --topic=mytopic --group=my_consumer_group

Read from __consumer_offsets

Add the following property to config/consumer.properties: exclude.internal.topics=false

bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --from-beginning --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

Kafka Consumer Groups

List the consumer groups known to Kafka

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list (old api)

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list (new api)

View the details of a consumer group

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group name>

kafkacat

Getting the last five message of a topic

kafkacat -C -b localhost:9092 -t mytopic -p 0 -o -5 -e

Zookeeper

Starting the Zookeeper Shell

bin/zookeeper-shell.sh localhost:2181

#run Java class
# ConsumerOffsetCheck. run when Kafka server is up, there is a topic + messages produced and consumed
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --zookeeper localhost:2181 --group test-consumer-group
# ConsumerOffsetChecker has been removed in Kafka 1.0.0. Use kafka-consumer-groups.sh to get consumer group details
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consule-consumer-38063
# bootstrap-server=kafka broker server to connect to, NOT zookeeper
# --describe => describe consumer group and list offset lag (# messages not yet processed)
# get a list of active consumer groups in cluster
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
#get latest offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1 --offsets 1
# offset of last available message in topic's partition --time -1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
# offset of first available message in topic's partition --time -2
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
# start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start kafka brokers (Servers = cluster)
bin/kafka-server-start.sh config/server.properties
# create a topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# list all topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# configure brokers to auto-create topics when a non-existent topic is published to
# see topic details (partition, replication factor)
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
# change partition number of a topic --alter
# Note: While Kafka allows us to add more partitions, it is NOT possible to decrease number of partitions of a Topic.
# In order to achieve this, you need to delete and re-create your Topic.
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3
# PRODUCER -----------------------------------
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# CONSUMER -----------
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
# removed --from-beginning => consumer only gets message that is produced after it is up
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
#Kafka CONNECT-----------------------------
# 2 stand-alone connectors (run in a single,local,dedicated process)
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties
# rsyslog-kafka output module https://sematext.com/blog/recipe-rsyslog-apache-kafka-logstash/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment