Skip to content

Instantly share code, notes, and snippets.

@anchitj
Created May 26, 2023 08:33
Show Gist options
  • Save anchitj/a0f90efd6b6afe02ded964b5277f608c to your computer and use it in GitHub Desktop.
Save anchitj/a0f90efd6b6afe02ded964b5277f608c to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
static void msg_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
{
if (rkmessage->err)
{
fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err));
}
else
{
fprintf(stderr, "%% Message delivered (%zd bytes, partition %" PRId32 ")\n",
rkmessage->len, rkmessage->partition);
}
}
static void logger_callback(const rd_kafka_t *rk, int level,
const char *fac, const char *buf)
{
fprintf(stderr, "RDKAFKA-%i-%s: %s: %s\n", level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}
int main()
{
const char *broker = "randomExample.com:9092";
const char *topic = "tpc";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_res_t result;
result = rd_kafka_conf_set(conf, "bootstrap.servers", broker, NULL, 0);
if (result != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "Failed to set bootstrap.servers: %s\n", broker);
return 1;
}
result = rd_kafka_conf_set(conf, "resolve.canonical.bootstrap.servers.only", "false", NULL, 0);
if (result != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "Failed to set resolve.canonical.bootstrap.servers.only: true\n");
return 1;
}
result = rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", NULL, 0);
if (result != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "Failed to set security.protocol: SASL_PLAINTEXT\n");
return 1;
}
result = rd_kafka_conf_set(conf, "sasl.kerberos.service.name", "kafka", NULL, 0);
if (result != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "Failed to set sasl.kerberos.service.name: kafka\n");
return 1;
}
result = rd_kafka_conf_set(conf, "sasl.kerberos.principal", "kafka_producer", NULL, 0);
if (result != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "Failed to set sasl.kerberos.principal: kafka_producer\n");
return 1;
}
result = rd_kafka_conf_set(conf, "sasl.kerberos.keytab", "/var/lib/secret/kafka-client.key", NULL, 0);
if (result != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "Failed to set sasl.kerberos.keytab: /var/lib/secret/kafka-client.key\n");
return 1;
}
// Set debug context to enable debugging
result = rd_kafka_conf_set(conf, "debug", "all", NULL, 0);
if (result != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "Failed to set debug: all\n");
return 1;
}
// Set logger callback to print debug logs
rd_kafka_conf_set_log_cb(conf, logger_callback);
char errstr[512];
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer)
{
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
rd_kafka_conf_set_dr_msg_cb(conf, msg_callback);
// Create a topic
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
rd_kafka_topic_t *rkt = rd_kafka_topic_new(producer, topic, topic_conf);
if (!rkt)
{
fprintf(stderr, "Failed to create topic: %s\n", topic);
return 1;
}
// Produce a message
const char *message = "Hello, Kafka!";
size_t message_len = strlen(message);
int partition = RD_KAFKA_PARTITION_UA;
int send_result = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
(void *)message, message_len, NULL, 0, NULL);
if (send_result == -1)
{
fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_poll(producer, 0);
return 1;
}
// Wait for message delivery reports
rd_kafka_poll(producer, -1);
// Clean up
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(producer);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment