Skip to content

Instantly share code, notes, and snippets.

@INDIAN2020
Last active December 27, 2023 10:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save INDIAN2020/45e86af1fd9067e7b91b4f2c4a296b6e to your computer and use it in GitHub Desktop.
Save INDIAN2020/45e86af1fd9067e7b91b4f2c4a296b6e to your computer and use it in GitHub Desktop.
Kafka commands to help on troubleshoot
These are four main parts in a Kafka system:
Broker:
Handles all requests from clients (produce, consume, and metadata) and keeps data replicated within the cluster.
There can be one or more brokers in a cluster.
Zookeeper:
Keeps the state of the cluster (brokers, topics, users).
Producer:
Sends records to a broker.
Consumer:
Consumes batches of records from the broker.
# Configure a topic using custom settings
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 \
--config max.message.bytes=64000 \
--config flush.messages=1
# To remove an override you can do
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
# Find the server.properties file
sudo find /opt -type f -name server.properties | grep -v kraft
# Check traffic logs
journalctl --help
# org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
To avoid this you can do one of following:
πŸ‹ Set auto.offset.reset config to either earliest or latest. You can find more info regarding this here(http://kafka.apache.org/documentation.html#consumerconfigs)
πŸ‹ You can get smallest offset available for a topic partition by running following Kafka command line tool
command:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker-ip:9092> --topic <topic-name> --time -2
# alter the partitions
bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic <your_topic_name> --partitions <new_partition_count>
# delete all topics starting with "regex-"
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'regex-.*'
NOTE: In cases where regex is not possible we can use a comma seperated list of topic names for the deletion of topics.
If not able to delete topic
You can not delete a topic when consuming it. Use bin/kafka-consumer-groups.sh or simple ps -aux|grep Consumer to find any possible consumers which block the operation.
Zookeeper UI: https://github.com/soabase/exhibitor
──────────────────────────────────────────────────────────────────────────────────────────────────────
How to purge all kafka data for fresh start for lab/test:
$ ###### stop and clear all brokers
$ sudo systemctl stop kafka.service zookeeper.service
$ sudo rm -rf /var/log/kafka-logs/*
$ ###### continue ONLY after finish the above on all brokers
$ sudo systemctl start zookeeper.service
$ sleep 10s # make sure zookeeper is ready
$ sudo systemctl start kafka.service
──────────────────────────────────────────────────────────────────────────────────────────────────────
──────────────────────────────────────────────────────────────────────────────────────────────────────
What is Log Compaction ?
log compaction is a strategy to remove records where we have the most recent update with the same primary key.
Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.
A message with a key and a null payload will be deleted from the log. In simple terms, Apache Kafka will keep the latest version of a record and delete the older versions with the same key.
https://i0.wp.com/kafka.apache.org/27/images/log_cleaner_anatomy.png
Kafka log compaction allows consumers to regain their state from the compacted topics.
Here, it will never re-order the messages but will delete a few with an updated one.
Also, the partition offset for a message will never change.
Each topic log is divided into two series areas based on the offsets and insertion, these areas are called head and tail, every time a new record is inserted it gets appended at the end of the head and compaction happens at the tail.
How log compaction works ?
The compaction is done in the background by periodically recopying log segments. Cleaning does not block the reads and can be throttled to use within a configurable amount of I/O throughput to avoid the impact on producers and consumers. His strategy not only deletes the duplicate records but also removes keys with the null values. These records are also known as Tombstone records.
https://i0.wp.com/i.ibb.co/MMzpqHV/Untitled-Diagram-1.jpg
If you don’t have Kafka running into local you can use docker-compose file (https://github.com/knoldus/devops-recipe-kafka-tombstone/blob/feature/log-compcation/docker-compose.yml) to run the Kafka into your system. The actual process of compacting a log segment looks something like this:
$ kafka-topics.sh --create --zookeeper zookeeper:2181 --topic latest-product-price-3 --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config delete.retention.ms=100 --config segment.ms=1 --config min.cleanable.dirty.ratio=0.00
$ kafka-console-producer.sh --broker-list localhost:9092 --topic latest-product-price-3 --property parse.key=true --property key.separator=:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic latest-product-price-3 --property print.key=true --property key.separator=: --from-beginning
──────────────────────────────────────────────────────────────────────────────────────────────────────
──────────────────────────────────────────────────────────────────────────────────────────────────────
# Removing Messages from a Kafka Topic
──────────────────────────────────────────────────────────────────────────────────────────────────────
πŸ‹ Option 1: Message Expiry
πŸ‹ Option 2: Record Deletion
πŸ‹ Option 3: Remove the Topic
Option 1: Message Expiry
The intended way of removing data from Kafka is to use one of the several configurable options for message expiry.
Expiry conditions are controlled by configuration parameters, and that can be based on how old messages are (sometimes called time to live or TTL) or the size of the topic.
The performance of Kafka is not affected by the data size of messages, so retaining lots of data is not a problem.
Expiry conditions apply to all messages within a given topic and can be set when the topic is first created or modified later for topics that already exist.
There are three time-based configuration parameters that can be used.
Higher-precision values such as ms take precedence over lower precision values such as hours.
log.retention.hours: number of hours the message will be saved
log.retention.minutes: number of minutes
log.retention.ms: number of milliseconds
Size based parameters include log.retention.bytes. When a size-based rule has been defined, messages are removed until the segments are below the provided size. If no expiry rules are defined, the data will be retained forever.
# Set the message retention time to one minute (60000 ms)
kafka-configs.sh --alter --zookeeper zookeeper:2181 \
--entity-type topics --entity-name test-topic \
--add-config retention.ms=60000
Option 2: Record Deletion
While message expiry will meet many use-cases, sometimes we want to delete records directly.
This can be done using the kafka-delete-records.sh tool to read a configuration file (JSON format) that describes the records we want to be deleted.
$ cat delete-test-topic.config
{
"partitions": [
{
"topic": "test-topic", # Your topic name here
"partition": 0,
"offset": -1
}
],
"version": 1
}
# Delete topic messages using a offset JSON file
$ kafka-delete-records.sh --bootstrap-server kafka:9092 --offset-json-file delete-test-topic.config
Option 3: Remove the Topic
If the first two options in this article don't quite meet your needs, you can also delete the topic and recreate it.
Important: this is only possible if the delete.topic.enable property is set to true in the Kafka server configuration.
# Remove test-topic from the Kafka server
kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic test-topic
References:
https://blog.knoldus.com/how-to-delete-record-from-kafka-topic-tombstone/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment