Created
May 26, 2023 08:33
-
-
Save anchitj/a0f90efd6b6afe02ded964b5277f608c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <string.h> | |
#include <librdkafka/rdkafka.h> | |
static void msg_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) | |
{ | |
if (rkmessage->err) | |
{ | |
fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); | |
} | |
else | |
{ | |
fprintf(stderr, "%% Message delivered (%zd bytes, partition %" PRId32 ")\n", | |
rkmessage->len, rkmessage->partition); | |
} | |
} | |
static void logger_callback(const rd_kafka_t *rk, int level, | |
const char *fac, const char *buf) | |
{ | |
fprintf(stderr, "RDKAFKA-%i-%s: %s: %s\n", level, fac, rk ? rd_kafka_name(rk) : NULL, buf); | |
} | |
int main() | |
{ | |
const char *broker = "randomExample.com:9092"; | |
const char *topic = "tpc"; | |
rd_kafka_conf_t *conf = rd_kafka_conf_new(); | |
rd_kafka_conf_res_t result; | |
result = rd_kafka_conf_set(conf, "bootstrap.servers", broker, NULL, 0); | |
if (result != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "Failed to set bootstrap.servers: %s\n", broker); | |
return 1; | |
} | |
result = rd_kafka_conf_set(conf, "resolve.canonical.bootstrap.servers.only", "false", NULL, 0); | |
if (result != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "Failed to set resolve.canonical.bootstrap.servers.only: true\n"); | |
return 1; | |
} | |
result = rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", NULL, 0); | |
if (result != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "Failed to set security.protocol: SASL_PLAINTEXT\n"); | |
return 1; | |
} | |
result = rd_kafka_conf_set(conf, "sasl.kerberos.service.name", "kafka", NULL, 0); | |
if (result != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "Failed to set sasl.kerberos.service.name: kafka\n"); | |
return 1; | |
} | |
result = rd_kafka_conf_set(conf, "sasl.kerberos.principal", "kafka_producer", NULL, 0); | |
if (result != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "Failed to set sasl.kerberos.principal: kafka_producer\n"); | |
return 1; | |
} | |
result = rd_kafka_conf_set(conf, "sasl.kerberos.keytab", "/var/lib/secret/kafka-client.key", NULL, 0); | |
if (result != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "Failed to set sasl.kerberos.keytab: /var/lib/secret/kafka-client.key\n"); | |
return 1; | |
} | |
// Set debug context to enable debugging | |
result = rd_kafka_conf_set(conf, "debug", "all", NULL, 0); | |
if (result != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "Failed to set debug: all\n"); | |
return 1; | |
} | |
// Set logger callback to print debug logs | |
rd_kafka_conf_set_log_cb(conf, logger_callback); | |
char errstr[512]; | |
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); | |
if (!producer) | |
{ | |
fprintf(stderr, "Failed to create producer: %s\n", errstr); | |
return 1; | |
} | |
rd_kafka_conf_set_dr_msg_cb(conf, msg_callback); | |
// Create a topic | |
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); | |
rd_kafka_topic_t *rkt = rd_kafka_topic_new(producer, topic, topic_conf); | |
if (!rkt) | |
{ | |
fprintf(stderr, "Failed to create topic: %s\n", topic); | |
return 1; | |
} | |
// Produce a message | |
const char *message = "Hello, Kafka!"; | |
size_t message_len = strlen(message); | |
int partition = RD_KAFKA_PARTITION_UA; | |
int send_result = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, | |
(void *)message, message_len, NULL, 0, NULL); | |
if (send_result == -1) | |
{ | |
fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error())); | |
rd_kafka_poll(producer, 0); | |
return 1; | |
} | |
// Wait for message delivery reports | |
rd_kafka_poll(producer, -1); | |
// Clean up | |
rd_kafka_topic_destroy(rkt); | |
rd_kafka_destroy(producer); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment