Created
June 11, 2024 21:46
-
-
Save jeansymolanza/0bca31db9728ae59fd6391ec5255d03b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <fcntl.h> | |
#include <pthread.h> | |
#include <librdkafka/rdkafka.h> | |
#include <signal.h> | |
#define PIPE_NAME "/path/to/pipe" | |
#define STOP_FILE "/usr/ud/batch/flags/g1_kafka.STOP" | |
#define POLL_INTERVAL 1 // seconds | |
#define NUM_THREADS 4 // Number of threads in the pool | |
volatile sig_atomic_t stop_requested = 0; | |
pthread_mutex_t kafka_lock; | |
// Function to handle delivery reports | |
static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { | |
if (rkmessage->err) { | |
fprintf(stderr, "Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); | |
} else { | |
printf("Message delivered to topic %s [%d] at offset %ld\n", | |
rd_kafka_topic_name(rkmessage->rkt), | |
rkmessage->partition, | |
rkmessage->offset); | |
} | |
} | |
// Signal handler to set the stop flag | |
void handle_signal(int signal) { | |
stop_requested = 1; | |
} | |
// Thread function to process messages | |
void* process_message(void* arg) { | |
char* message = (char*)arg; | |
rd_kafka_t *rk = (rd_kafka_t*)pthread_getspecific(pthread_self()); | |
rd_kafka_topic_t *rkt = (rd_kafka_topic_t*)pthread_getspecific(pthread_self() + 1); | |
pthread_mutex_lock(&kafka_lock); | |
// Produce message asynchronously | |
if (rd_kafka_produce( | |
rkt, RD_KAFKA_PARTITION_UA, | |
RD_KAFKA_MSG_F_COPY, | |
message, strlen(message), | |
NULL, 0, | |
NULL) == -1) { | |
fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); | |
} | |
// Poll to handle delivery reports | |
rd_kafka_poll(rk, 0); | |
pthread_mutex_unlock(&kafka_lock); | |
free(message); | |
return NULL; | |
} | |
int main(int argc, char **argv) { | |
if (argc != 2 || (strcmp(argv[1], "start") != 0 && strcmp(argv[1], "stop") != 0)) { | |
fprintf(stderr, "Usage: %s {start|stop}\n", argv[0]); | |
return 1; | |
} | |
if (strcmp(argv[1], "start") == 0) { | |
const char *brokers = getenv("KAFKA_BOOTSTRAP_SERVERS"); | |
const char *topic = getenv("KAFKA_TOPIC"); | |
const char *jaas_conf = getenv("KAFKA_JAAS_CONF"); | |
const char *krb5_conf = getenv("KAFKA_KRB5_CONF"); | |
const char *keystore = getenv("KAFKA_KEYSTORE"); | |
const char *keystore_password = getenv("KAFKA_KEYSTORE_PASSWORD"); | |
const char *truststore = getenv("KAFKA_TRUSTSTORE"); | |
const char *truststore_password = getenv("KAFKA_TRUSTSTORE_PASSWORD"); | |
int missing_vars = 0; | |
if (!brokers) { | |
fprintf(stderr, "Missing environment variable: KAFKA_BOOTSTRAP_SERVERS\n"); | |
missing_vars++; | |
} | |
if (!topic) { | |
fprintf(stderr, "Missing environment variable: KAFKA_TOPIC\n"); | |
missing_vars++; | |
} | |
if (!jaas_conf) { | |
fprintf(stderr, "Missing environment variable: KAFKA_JAAS_CONF\n"); | |
missing_vars++; | |
} | |
if (!krb5_conf) { | |
fprintf(stderr, "Missing environment variable: KAFKA_KRB5_CONF\n"); | |
missing_vars++; | |
} | |
if (!keystore) { | |
fprintf(stderr, "Missing environment variable: KAFKA_KEYSTORE\n"); | |
missing_vars++; | |
} | |
if (!keystore_password) { | |
fprintf(stderr, "Missing environment variable: KAFKA_KEYSTORE_PASSWORD\n"); | |
missing_vars++; | |
} | |
if (!truststore) { | |
fprintf(stderr, "Missing environment variable: KAFKA_TRUSTSTORE\n"); | |
missing_vars++; | |
} | |
if (!truststore_password) { | |
fprintf(stderr, "Missing environment variable: KAFKA_TRUSTSTORE_PASSWORD\n"); | |
missing_vars++; | |
} | |
if (missing_vars > 0) { | |
return 1; | |
} | |
rd_kafka_t *rk; // Producer instance handle | |
rd_kafka_conf_t *conf; // Temporary configuration object | |
char errstr[512]; // librdkafka API error reporting buffer | |
// Create Kafka client configuration place-holder | |
conf = rd_kafka_conf_new(); | |
// Set the delivery report callback. | |
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); | |
// Increase buffer size and batch settings | |
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); | |
rd_kafka_conf_set(conf, "batch.num.messages", "1000", errstr, sizeof(errstr)); | |
rd_kafka_conf_set(conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr)); | |
// SSL configuration | |
if (rd_kafka_conf_set(conf, "ssl.keystore.location", keystore, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% ssl.keystore.location error: %s\n", errstr); | |
return 1; | |
} | |
if (rd_kafka_conf_set(conf, "ssl.keystore.password", keystore_password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% ssl.keystore.password error: %s\n", errstr); | |
return 1; | |
} | |
if (rd_kafka_conf_set(conf, "ssl.ca.location", truststore, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% ssl.ca.location error: %s\n", errstr); | |
return 1; | |
} | |
// JAAS configuration | |
if (rd_kafka_conf_set(conf, "sasl.jaas.config", jaas_conf, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% sasl.jaas.config error: %s\n", errstr); | |
return 1; | |
} | |
// Kerberos configuration | |
setenv("KRB5_CONFIG", krb5_conf, 1); | |
if (rd_kafka_conf_set(conf, "sasl.kerberos.service.name", "kafka", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% sasl.kerberos.service.name error: %s\n", errstr); | |
return 1; | |
} | |
if (rd_kafka_conf_set(conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% security.protocol error: %s\n", errstr); | |
return 1; | |
} | |
if (rd_kafka_conf_set(conf, "sasl.mechanism", "GSSAPI", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% sasl.mechanism error: %s\n", errstr); | |
return 1; | |
} | |
// Create producer instance. | |
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); | |
if (!rk) { | |
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); | |
return 1; | |
} | |
// Add brokers | |
if (rd_kafka_brokers_add(rk, brokers) == 0) { | |
fprintf(stderr, "%% No valid brokers specified\n"); | |
rd_kafka_destroy(rk); | |
return 1; | |
} | |
// Create topic object | |
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL); | |
if (!rkt) { | |
fprintf(stderr, "%% Failed to create topic object: %s\n", rd_kafka_err2str(rd_kafka_last_error())); | |
rd_kafka_destroy(rk); | |
return 1; | |
} | |
// Create named pipe if it doesn't exist | |
mkfifo(PIPE_NAME, 0666); | |
// Set up signal handling for graceful shutdown | |
signal(SIGINT, handle_signal); | |
signal(SIGTERM, handle_signal); | |
// Initialize mutex | |
pthread_mutex_init(&kafka_lock, NULL); | |
// Main loop to process messages from the pipe | |
while (!stop_requested) { | |
if (access(STOP_FILE, F_OK) != -1) { | |
// Stop file exists, exit the loop | |
break; | |
} | |
int pipe_fd = open(PIPE_NAME, O_RDONLY); | |
if (pipe_fd < 0) { | |
fprintf(stderr, "%% Failed to open pipe: %s\n", PIPE_NAME); | |
break; | |
} | |
char line[1024]; | |
while (read(pipe_fd, line, sizeof(line)) > 0) { | |
// Remove newline character | |
line[strcspn(line, "\n")] = 0; | |
// Allocate memory for the message | |
char* message = strdup(line); | |
if (message == NULL) { | |
fprintf(stderr, "%% Memory allocation error\n"); | |
continue; | |
} | |
// Create a thread to process the message | |
pthread_t thread; | |
pthread_create(&thread, NULL, process_message, (void*)message); | |
pthread_detach(thread); | |
} | |
close(pipe_fd); | |
// Wait for a while before checking the pipe again | |
sleep(POLL_INTERVAL); | |
} | |
// Wait for all messages to be delivered | |
rd_kafka_flush(rk, 10 * 1000); | |
// Destroy topic object | |
rd_kafka_topic_destroy(rkt); | |
// Destroy producer instance | |
rd_kafka_destroy(rk); | |
// Destroy mutex | |
pthread_mutex_destroy(&kafka_lock); | |
} else if (strcmp(argv[1], "stop") == 0) { | |
// Create the stop file | |
FILE *stop_file = fopen(STOP_FILE, "w"); | |
if (stop_file) { | |
fclose(stop_file); | |
} | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment