Skip to content

Instantly share code, notes, and snippets.

@spaskalev
Created June 14, 2021 21:02
Show Gist options
  • Save spaskalev/fdf7568905c5a37aa5b972bd67fa6ab1 to your computer and use it in GitHub Desktop.
Save spaskalev/fdf7568905c5a37aa5b972bd67fa6ab1 to your computer and use it in GitHub Desktop.
A forking writer pattern, e.g. a transaction processor with eventually-consistent non-blocking readers
#define _GNU_SOURCE
#include <unistd.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <signal.h>
#include <fcntl.h>
#include <mqueue.h>
#include <netinet/ip.h> /* superset of previous */
int main() {
/* The cow state */
int state = 0;
const char *mq = "/3165c597-ab7c-4219-a3a8-6126e94d7ecb";
char mq_buffer[1024] = {0};
unsigned int mq_msg_prio = 0;
mq_unlink(mq);
struct mq_attr mq_attr;
mq_attr.mq_maxmsg = 1;
mq_attr.mq_msgsize = 1;
mqd_t mqd = mq_open(mq, O_RDWR | O_CREAT, O_RDWR, &mq_attr);
if (mqd == -1) {
printf("failed to open queue\n");
}
/* Do not leave defunct child processes */
signal(SIGCHLD, SIG_IGN);
/* Create a bind a socket */
int sock = socket(AF_INET, SOCK_STREAM, 0);
int option = 1;
/* Reuse existing socket with time_wait connections */
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons( 1337 );
if(bind(sock, (struct sockaddr *)&addr, sizeof(addr))) {
printf("failed to bind socket!\n");
exit(-1);
}
listen(sock, 128);
while(1) {
int acceptor_pid = fork();
if(acceptor_pid) {
/* receive a message (blocks) */
mq_receive(mqd, mq_buffer, 1, &mq_msg_prio);
/* change the state */
state++;
/* restart the acceptor */
kill(acceptor_pid, SIGINT);
} else {
/* acceptor child */
struct sockaddr_in client_addr;
int client_add_size;
while (1) {
int client_fd = accept(sock, (struct sockaddr *)&client_addr, (socklen_t*)&client_add_size);
if (fork()) {
close(client_fd);
continue ;
} else {
/* handler child */
char buffer[128] = {0};
/* read cmd */
recv(client_fd, buffer, 128, 0);
if (buffer[0] == 'g') {
sprintf(buffer, "%d", state);
send(client_fd, &buffer, strlen(buffer)+1, 0);
close(client_fd);
exit(0);
}
if (buffer[0] == 'i') {
mq_send(mqd, buffer, 1, 0);
close(client_fd);
exit(0);
}
strcpy(buffer, "unknown command!");
send(client_fd, &buffer, strlen(buffer)+1, 0);
close(client_fd);
exit(0);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment