Skip to content

Instantly share code, notes, and snippets.

@aliabidzaidi
Created November 3, 2021 07:24
Show Gist options
  • Save aliabidzaidi/5ab0a313d282d3ebb8cb4a63589e53b8 to your computer and use it in GitHub Desktop.
Save aliabidzaidi/5ab0a313d282d3ebb8cb4a63589e53b8 to your computer and use it in GitHub Desktop.
Kafka Pushes into 1 out of 4 partitions only
// To Compile & Run
// gcc -o test2 test2.c -lrdkafka -ljansson
// ./test2
#include <stdio.h>
#include <unistd.h>
#include "kafkaProducer.h"
#include <librdkafka/rdkafka.h>
#include <jansson.h>
static void dr_msg_cb(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque)
{
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
else
fprintf(stderr, "%% Message delivered (%zd bytes, partition %" PRId32 ")\n", rkmessage->len, rkmessage->partition);
}
rd_kafka_t *kafka_init()
{
rd_kafka_t *rk; /* Producer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
char errstr[512];
const char *brokers = "10.120.1.134:9092"; /* Argument: broker list */
conf = rd_kafka_conf_new();
printf("Setting configurations\n");
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
return NULL;
}
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
printf("Configurations set, now creating producer \n");
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk)
{
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
return NULL;
}
printf("Producer created successfully\n");
return rk;
}
int produce_to_topic(rd_kafka_t *rk, char *event, char *topic)
{
rd_kafka_resp_err_t err;
retry:
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(event, strlen(event)), RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);
if (err)
{
fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
rd_kafka_poll(rk, 1000 /*block for max 1000ms*/);
goto retry;
}
}
else
{
// fprintf(stderr, "%% Enqueued message (%zd bytes) "
// "for topic %s\n",
// strlen(event), topic);
}
return 0;
}
int kafka_destory(rd_kafka_t *rk)
{
fprintf(stderr, "%% Flushing final messages..\n");
rd_kafka_flush(rk, 2 * 1000 /* wait for max 10 seconds */);
if (rd_kafka_outq_len(rk) > 0)
fprintf(stderr, "%% %d message(s) were not delivered\n",
rd_kafka_outq_len(rk));
rd_kafka_destroy(rk);
}
char *create_data()
{
json_t *root = json_object();
json_object_set_new(root, "uuid", json_string("1234567890123456789012345678901234567"));
json_object_set_new(root, "classification_path", json_string("http.tls"));
json_object_set_new(root, "packet_uploads", json_integer(44));
json_object_set_new(root, "packet_download", json_integer(87));
json_object_set_new(root, "vol_upload", json_integer(100));
json_object_set_new(root, "vol_download", json_integer(2000));
json_object_set_new(root, "src_addr", json_string("192.168.7.1"));
json_object_set_new(root, "dest_addr", json_string("213.12.54.22"));
json_object_set_new(root, "src_port", json_integer(443));
json_object_set_new(root, "dest_port", json_integer(56443));
char *str = json_dumps(root, 0);
json_decref(root);
return str;
}
int main()
{
rd_kafka_t *rdKafka = kafka_init();
if(rdKafka == NULL)
{
printf("rd_kafka_t unable to set\n");
return 101;
}
char *dt = create_data();
while (1)
{
produce_to_topic(rdKafka, dt, "radius_queue");
sleep(0.5);
}
kafka_destory(rdKafka);
free(dt);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment