Created
November 3, 2021 07:24
-
-
Save aliabidzaidi/5ab0a313d282d3ebb8cb4a63589e53b8 to your computer and use it in GitHub Desktop.
Kafka Pushes into 1 out of 4 partitions only
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// To Compile & Run | |
// gcc -o test2 test2.c -lrdkafka -ljansson | |
// ./test2 | |
#include <stdio.h> | |
#include <unistd.h> | |
#include "kafkaProducer.h" | |
#include <librdkafka/rdkafka.h> | |
#include <jansson.h> | |
static void dr_msg_cb(rd_kafka_t *rk, | |
const rd_kafka_message_t *rkmessage, void *opaque) | |
{ | |
if (rkmessage->err) | |
fprintf(stderr, "%% Message delivery failed: %s\n", | |
rd_kafka_err2str(rkmessage->err)); | |
else | |
fprintf(stderr, "%% Message delivered (%zd bytes, partition %" PRId32 ")\n", rkmessage->len, rkmessage->partition); | |
} | |
rd_kafka_t *kafka_init() | |
{ | |
rd_kafka_t *rk; /* Producer instance handle */ | |
rd_kafka_conf_t *conf; /* Temporary configuration object */ | |
char errstr[512]; | |
const char *brokers = "10.120.1.134:9092"; /* Argument: broker list */ | |
conf = rd_kafka_conf_new(); | |
printf("Setting configurations\n"); | |
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) | |
{ | |
fprintf(stderr, "%s\n", errstr); | |
return NULL; | |
} | |
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); | |
printf("Configurations set, now creating producer \n"); | |
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); | |
if (!rk) | |
{ | |
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); | |
return NULL; | |
} | |
printf("Producer created successfully\n"); | |
return rk; | |
} | |
int produce_to_topic(rd_kafka_t *rk, char *event, char *topic) | |
{ | |
rd_kafka_resp_err_t err; | |
retry: | |
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), | |
RD_KAFKA_V_VALUE(event, strlen(event)), RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); | |
if (err) | |
{ | |
fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err)); | |
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) | |
{ | |
rd_kafka_poll(rk, 1000 /*block for max 1000ms*/); | |
goto retry; | |
} | |
} | |
else | |
{ | |
// fprintf(stderr, "%% Enqueued message (%zd bytes) " | |
// "for topic %s\n", | |
// strlen(event), topic); | |
} | |
return 0; | |
} | |
int kafka_destory(rd_kafka_t *rk) | |
{ | |
fprintf(stderr, "%% Flushing final messages..\n"); | |
rd_kafka_flush(rk, 2 * 1000 /* wait for max 10 seconds */); | |
if (rd_kafka_outq_len(rk) > 0) | |
fprintf(stderr, "%% %d message(s) were not delivered\n", | |
rd_kafka_outq_len(rk)); | |
rd_kafka_destroy(rk); | |
} | |
char *create_data() | |
{ | |
json_t *root = json_object(); | |
json_object_set_new(root, "uuid", json_string("1234567890123456789012345678901234567")); | |
json_object_set_new(root, "classification_path", json_string("http.tls")); | |
json_object_set_new(root, "packet_uploads", json_integer(44)); | |
json_object_set_new(root, "packet_download", json_integer(87)); | |
json_object_set_new(root, "vol_upload", json_integer(100)); | |
json_object_set_new(root, "vol_download", json_integer(2000)); | |
json_object_set_new(root, "src_addr", json_string("192.168.7.1")); | |
json_object_set_new(root, "dest_addr", json_string("213.12.54.22")); | |
json_object_set_new(root, "src_port", json_integer(443)); | |
json_object_set_new(root, "dest_port", json_integer(56443)); | |
char *str = json_dumps(root, 0); | |
json_decref(root); | |
return str; | |
} | |
int main() | |
{ | |
rd_kafka_t *rdKafka = kafka_init(); | |
if(rdKafka == NULL) | |
{ | |
printf("rd_kafka_t unable to set\n"); | |
return 101; | |
} | |
char *dt = create_data(); | |
while (1) | |
{ | |
produce_to_topic(rdKafka, dt, "radius_queue"); | |
sleep(0.5); | |
} | |
kafka_destory(rdKafka); | |
free(dt); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment