Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jeansymolanza/0bca31db9728ae59fd6391ec5255d03b to your computer and use it in GitHub Desktop.
Save jeansymolanza/0bca31db9728ae59fd6391ec5255d03b to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <librdkafka/rdkafka.h>
#include <signal.h>
#define PIPE_NAME "/path/to/pipe"
#define STOP_FILE "/usr/ud/batch/flags/g1_kafka.STOP"
#define POLL_INTERVAL 1 // seconds
#define NUM_THREADS 4 // Number of threads in the pool
volatile sig_atomic_t stop_requested = 0;
pthread_mutex_t kafka_lock;
// Function to handle delivery reports
static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err));
} else {
printf("Message delivered to topic %s [%d] at offset %ld\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset);
}
}
// Signal handler to set the stop flag
void handle_signal(int signal) {
stop_requested = 1;
}
// Thread function to process messages
void* process_message(void* arg) {
char* message = (char*)arg;
rd_kafka_t *rk = (rd_kafka_t*)pthread_getspecific(pthread_self());
rd_kafka_topic_t *rkt = (rd_kafka_topic_t*)pthread_getspecific(pthread_self() + 1);
pthread_mutex_lock(&kafka_lock);
// Produce message asynchronously
if (rd_kafka_produce(
rkt, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
message, strlen(message),
NULL, 0,
NULL) == -1) {
fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error()));
}
// Poll to handle delivery reports
rd_kafka_poll(rk, 0);
pthread_mutex_unlock(&kafka_lock);
free(message);
return NULL;
}
int main(int argc, char **argv) {
if (argc != 2 || (strcmp(argv[1], "start") != 0 && strcmp(argv[1], "stop") != 0)) {
fprintf(stderr, "Usage: %s {start|stop}\n", argv[0]);
return 1;
}
if (strcmp(argv[1], "start") == 0) {
const char *brokers = getenv("KAFKA_BOOTSTRAP_SERVERS");
const char *topic = getenv("KAFKA_TOPIC");
const char *jaas_conf = getenv("KAFKA_JAAS_CONF");
const char *krb5_conf = getenv("KAFKA_KRB5_CONF");
const char *keystore = getenv("KAFKA_KEYSTORE");
const char *keystore_password = getenv("KAFKA_KEYSTORE_PASSWORD");
const char *truststore = getenv("KAFKA_TRUSTSTORE");
const char *truststore_password = getenv("KAFKA_TRUSTSTORE_PASSWORD");
int missing_vars = 0;
if (!brokers) {
fprintf(stderr, "Missing environment variable: KAFKA_BOOTSTRAP_SERVERS\n");
missing_vars++;
}
if (!topic) {
fprintf(stderr, "Missing environment variable: KAFKA_TOPIC\n");
missing_vars++;
}
if (!jaas_conf) {
fprintf(stderr, "Missing environment variable: KAFKA_JAAS_CONF\n");
missing_vars++;
}
if (!krb5_conf) {
fprintf(stderr, "Missing environment variable: KAFKA_KRB5_CONF\n");
missing_vars++;
}
if (!keystore) {
fprintf(stderr, "Missing environment variable: KAFKA_KEYSTORE\n");
missing_vars++;
}
if (!keystore_password) {
fprintf(stderr, "Missing environment variable: KAFKA_KEYSTORE_PASSWORD\n");
missing_vars++;
}
if (!truststore) {
fprintf(stderr, "Missing environment variable: KAFKA_TRUSTSTORE\n");
missing_vars++;
}
if (!truststore_password) {
fprintf(stderr, "Missing environment variable: KAFKA_TRUSTSTORE_PASSWORD\n");
missing_vars++;
}
if (missing_vars > 0) {
return 1;
}
rd_kafka_t *rk; // Producer instance handle
rd_kafka_conf_t *conf; // Temporary configuration object
char errstr[512]; // librdkafka API error reporting buffer
// Create Kafka client configuration place-holder
conf = rd_kafka_conf_new();
// Set the delivery report callback.
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
// Increase buffer size and batch settings
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "batch.num.messages", "1000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr));
// SSL configuration
if (rd_kafka_conf_set(conf, "ssl.keystore.location", keystore, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% ssl.keystore.location error: %s\n", errstr);
return 1;
}
if (rd_kafka_conf_set(conf, "ssl.keystore.password", keystore_password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% ssl.keystore.password error: %s\n", errstr);
return 1;
}
if (rd_kafka_conf_set(conf, "ssl.ca.location", truststore, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% ssl.ca.location error: %s\n", errstr);
return 1;
}
// JAAS configuration
if (rd_kafka_conf_set(conf, "sasl.jaas.config", jaas_conf, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% sasl.jaas.config error: %s\n", errstr);
return 1;
}
// Kerberos configuration
setenv("KRB5_CONFIG", krb5_conf, 1);
if (rd_kafka_conf_set(conf, "sasl.kerberos.service.name", "kafka", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% sasl.kerberos.service.name error: %s\n", errstr);
return 1;
}
if (rd_kafka_conf_set(conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% security.protocol error: %s\n", errstr);
return 1;
}
if (rd_kafka_conf_set(conf, "sasl.mechanism", "GSSAPI", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% sasl.mechanism error: %s\n", errstr);
return 1;
}
// Create producer instance.
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
return 1;
}
// Add brokers
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
rd_kafka_destroy(rk);
return 1;
}
// Create topic object
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL);
if (!rkt) {
fprintf(stderr, "%% Failed to create topic object: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
// Create named pipe if it doesn't exist
mkfifo(PIPE_NAME, 0666);
// Set up signal handling for graceful shutdown
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
// Initialize mutex
pthread_mutex_init(&kafka_lock, NULL);
// Main loop to process messages from the pipe
while (!stop_requested) {
if (access(STOP_FILE, F_OK) != -1) {
// Stop file exists, exit the loop
break;
}
int pipe_fd = open(PIPE_NAME, O_RDONLY);
if (pipe_fd < 0) {
fprintf(stderr, "%% Failed to open pipe: %s\n", PIPE_NAME);
break;
}
char line[1024];
while (read(pipe_fd, line, sizeof(line)) > 0) {
// Remove newline character
line[strcspn(line, "\n")] = 0;
// Allocate memory for the message
char* message = strdup(line);
if (message == NULL) {
fprintf(stderr, "%% Memory allocation error\n");
continue;
}
// Create a thread to process the message
pthread_t thread;
pthread_create(&thread, NULL, process_message, (void*)message);
pthread_detach(thread);
}
close(pipe_fd);
// Wait for a while before checking the pipe again
sleep(POLL_INTERVAL);
}
// Wait for all messages to be delivered
rd_kafka_flush(rk, 10 * 1000);
// Destroy topic object
rd_kafka_topic_destroy(rkt);
// Destroy producer instance
rd_kafka_destroy(rk);
// Destroy mutex
pthread_mutex_destroy(&kafka_lock);
} else if (strcmp(argv[1], "stop") == 0) {
// Create the stop file
FILE *stop_file = fopen(STOP_FILE, "w");
if (stop_file) {
fclose(stop_file);
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment