Skip to content

Instantly share code, notes, and snippets.

@cameronkerrnz
Last active June 1, 2021 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cameronkerrnz/9aa9666f45f7169514da2d7a3a1605b6 to your computer and use it in GitHub Desktop.
Save cameronkerrnz/9aa9666f45f7169514da2d7a3a1605b6 to your computer and use it in GitHub Desktop.
Export Kafka timestamp of earliest message in a topic/partition
*/5 * * * * root /usr/local/bin/kafka_exporter_textfile_extras > TEXTFILE_COLLECTOR_PATH/kafka_exporter_textfile_extras.prom
#!/bin/bash
#
# When using Kakfa as a queue, you often have some time/space based
# retention policies. If you're relying on Kafka as a way to repeat
# processing after an error, it can be important to know how much of
# a time window you have available to rewind.
#
# Kafka_exporter is a useful tool, but while it does export the
# earliest and latest offsets in a topic, it doesn't export as a
# timestamp. Recent versions of Kafka include a timestamp in each
# message; we can therefore query each topic/partition to give
# us the timestamp of the earliest message in the topic/partition.
#
# Even though this is using topic/partition, it does create a
# consumer-group temporarily, which Kafka then has to balance.
# As a result, I would recommend not running this script overly
# frequently; once per five minutes should be ample.
#
# To run this, schedule it as per your needs; redirect its output
# to a file (with extension of .prom) and configure your node
# exporter with --collector.textfile.directory
kafka_topic_earliest_time_seconds() {
local topic="$1"
local partition="$2"
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 --timeout-ms=10000 \
--offset earliest --max-messages=1 \
--topic "$topic" --partition "$partition" \
--property print.timestamp=true --property print.value=false \
2>&1 \
| awk -F: -v topic="$topic" -v partition="$partition" '
$1 == "CreateTime" {
print("kafka_topic_partition_earliest_time_seconds{topic=\"" topic "\",partition=\"" partition "\"} " $2/1000)
}
{ next }
'
}
echo "# HELP kafka_topic_earliest_time_seconds Timestamp of the earliest message currently in the topic"
echo "# TYPE kafka_topic_earliest_time_seconds gauge"
# CHANGE THIS TO SUIT
kafka_topic_earliest_time_seconds your_topic_here 0
kafka_topic_earliest_time_seconds another_topic_here 0
kafka_topic_earliest_time_seconds another_topic_here 1

I've been working on improving operational visibility of a system that uses Kafka as its data backbone that have a combination of size and age-based retention policies [with no compaction]. One of the key metrics I want to track is how much time is actually held in some of these topics, before I start making changes, as a longer amount of time means I have a longer range of recovery if I need to reprocess that data. Although kafka_exporter does give me useful visibility of topic lag, Kafka doesn't provide any tools specifically for reporting on the timestamp of the earliest message, so with the aid of kafka-console-consumer, some basic scripting, Prometheus and Grafana, now I have the visibility I need, and could set up some useful thresholds for alerting.

To use with Grafana

To use this in a Grafana dashboard, here's a sample Prometheus query you can use as part of a Singlestat (format the Singlestat with a unit of 'seconds')

time() - max(kafka_topic_partition_earliest_time_seconds{topic=~"$backbone_topics"})

Here I've used a Grafana variable called backbone_topics; this is derived as a Prometheus Query as follows. Additionally I have also addeda Regex constraint to match only the topics of interest. Alternatively you might create a list with a 'Custom' type of variable and specify the topics as a list.

label_values(kafka_topic_partition_current_offset, topic)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment