Skip to content

Instantly share code, notes, and snippets.

@zilder
Created September 24, 2019 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zilder/e79240f791cf0996bfd31766ac07e42d to your computer and use it in GitHub Desktop.
Save zilder/e79240f791cf0996bfd31766ac07e42d to your computer and use it in GitHub Desktop.
Sample code that uses `rd_kafka_query_watermark_offsets()`
#include <stdio.h>
#include <stdlib.h>
#include "librdkafka/rdkafka.h"
#define BROKERS_LIST "localhost"
#define TOPIC "my_topic"
#define error(msg) \
do { \
printf(msg "\n"); \
exit(1); \
} while (0)
int main(void)
{
rd_kafka_topic_conf_t *topic_conf = NULL;
rd_kafka_conf_t * conf;
char errstr[512];
rd_kafka_t *kafka_handle;
rd_kafka_topic_t *kafka_topic_handle;
rd_kafka_resp_err_t err;
const struct rd_kafka_metadata *metadata;
const struct rd_kafka_metadata_topic *topic;
int i;
conf = rd_kafka_conf_new();
kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (kafka_handle != NULL)
{
/* Add brokers */
if (rd_kafka_brokers_add(kafka_handle, BROKERS_LIST) < 1)
{
rd_kafka_destroy(kafka_handle);
error("No valid brokers specified");
}
/* Create topic handle */
topic_conf = rd_kafka_topic_conf_new();
if (rd_kafka_topic_conf_set(topic_conf, "auto.commit.enable", "false", errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK)
{
error("Unable to create a topic");
}
kafka_topic_handle = rd_kafka_topic_new(kafka_handle, TOPIC, topic_conf);
if (!kafka_topic_handle)
{
error("Unable to create topic");
}
topic_conf = NULL; /* Now owned by kafka_topic_handle */
}
else
{
error("Unable to connect to broker");
}
/* Fetch metadata */
err = rd_kafka_metadata(kafka_handle, 0, kafka_topic_handle, &metadata, 5000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
error("Failed to acquire metadata");
if (metadata->topic_cnt != 1)
error("Expected a single topic");
topic = &metadata->topics[0];
for (i = 0; i < topic->partition_cnt; ++i)
{
int64_t low, high;
err = rd_kafka_query_watermark_offsets(kafka_handle,
TOPIC,
topic->partitions[i].id,
&low, &high,
1000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR
&& err != RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
{
error("Failed to get watermarks");
}
printf("partition %i: (low, high)->(%li, %li)\n",
topic->partitions[i].id, low, high);
}
rd_kafka_metadata_destroy(metadata);
printf("all done!\n");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment