Created
June 14, 2021 21:02
-
-
Save spaskalev/fdf7568905c5a37aa5b972bd67fa6ab1 to your computer and use it in GitHub Desktop.
A forking writer pattern, e.g. a transaction processor with eventually-consistent non-blocking readers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define _GNU_SOURCE | |
#include <unistd.h> | |
#include <sys/mman.h> | |
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <sys/socket.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <errno.h> | |
#include <limits.h> | |
#include <signal.h> | |
#include <fcntl.h> | |
#include <mqueue.h> | |
#include <netinet/ip.h> /* superset of previous */ | |
int main() { | |
/* The cow state */ | |
int state = 0; | |
const char *mq = "/3165c597-ab7c-4219-a3a8-6126e94d7ecb"; | |
char mq_buffer[1024] = {0}; | |
unsigned int mq_msg_prio = 0; | |
mq_unlink(mq); | |
struct mq_attr mq_attr; | |
mq_attr.mq_maxmsg = 1; | |
mq_attr.mq_msgsize = 1; | |
mqd_t mqd = mq_open(mq, O_RDWR | O_CREAT, O_RDWR, &mq_attr); | |
if (mqd == -1) { | |
printf("failed to open queue\n"); | |
} | |
/* Do not leave defunct child processes */ | |
signal(SIGCHLD, SIG_IGN); | |
/* Create a bind a socket */ | |
int sock = socket(AF_INET, SOCK_STREAM, 0); | |
int option = 1; | |
/* Reuse existing socket with time_wait connections */ | |
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)); | |
struct sockaddr_in addr; | |
addr.sin_family = AF_INET; | |
addr.sin_addr.s_addr = INADDR_ANY; | |
addr.sin_port = htons( 1337 ); | |
if(bind(sock, (struct sockaddr *)&addr, sizeof(addr))) { | |
printf("failed to bind socket!\n"); | |
exit(-1); | |
} | |
listen(sock, 128); | |
while(1) { | |
int acceptor_pid = fork(); | |
if(acceptor_pid) { | |
/* receive a message (blocks) */ | |
mq_receive(mqd, mq_buffer, 1, &mq_msg_prio); | |
/* change the state */ | |
state++; | |
/* restart the acceptor */ | |
kill(acceptor_pid, SIGINT); | |
} else { | |
/* acceptor child */ | |
struct sockaddr_in client_addr; | |
int client_add_size; | |
while (1) { | |
int client_fd = accept(sock, (struct sockaddr *)&client_addr, (socklen_t*)&client_add_size); | |
if (fork()) { | |
close(client_fd); | |
continue ; | |
} else { | |
/* handler child */ | |
char buffer[128] = {0}; | |
/* read cmd */ | |
recv(client_fd, buffer, 128, 0); | |
if (buffer[0] == 'g') { | |
sprintf(buffer, "%d", state); | |
send(client_fd, &buffer, strlen(buffer)+1, 0); | |
close(client_fd); | |
exit(0); | |
} | |
if (buffer[0] == 'i') { | |
mq_send(mqd, buffer, 1, 0); | |
close(client_fd); | |
exit(0); | |
} | |
strcpy(buffer, "unknown command!"); | |
send(client_fd, &buffer, strlen(buffer)+1, 0); | |
close(client_fd); | |
exit(0); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment