Skip to content

Instantly share code, notes, and snippets.

@Voldrix
Last active January 31, 2024 16:13
Show Gist options
  • Save Voldrix/5f48c9b729b9de1b36b465f4f9a0290a to your computer and use it in GitHub Desktop.
Save Voldrix/5f48c9b729b9de1b36b465f4f9a0290a to your computer and use it in GitHub Desktop.
multithreaded signal driven IO without multiplexing template in C
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <signal.h>
#define THREADS 4
#define QUEUE_DEPTH 32 //power of 2
#define BUFFER_SIZE 2048
#define SOCKET_PORT 8080
sigset_t signal_mask;
int queue[THREADS * QUEUE_DEPTH] = {0};
int qtail[THREADS] = {0}, qhead = 0, threadPtr = 0;
char buffers[THREADS * BUFFER_SIZE];
const char *headers = "HTTP/1.0 200 OK\r\ncache-control: no-store\r\nAccess-Control-Allow-Origin: *\r\nConnection: close\r\ncontent-type: text/plain; charset=utf-8\r\ncontent-length: 0\r\n\r\n";
void parse_request(int fd, char *buff) {
int retLen = strlen(headers);
memcpy(buff, headers, retLen);
write(fd, buff, retLen);
close(fd);
}
void* worker_thread(void* threadNo) {
int threadNum = (long long)threadNo;
int fd, bytesRead, sig_caught, *tail = &qtail[threadNum], *_queue = &queue[QUEUE_DEPTH * threadNum];
char *buff = &buffers[BUFFER_SIZE * threadNum];
while(1) {
if(_queue[*tail] == 0) //no new connection queued
sigwait(&signal_mask, &sig_caught);
fd = _queue[*tail];
_queue[*tail] = 0;
*tail += 1;
*tail &= QUEUE_DEPTH - 1; //rollover
bytesRead = read(fd, buff, sizeof(buff) - 1);
if(bytesRead <= 0) //read error or EOF
continue;
buff[bytesRead] = 0; //null terminate str
parse_request(fd, buff);
}
return 0;
}
int main(void) {
daemon(0, 0);
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGCONT);
sigprocmask(SIG_SETMASK, &signal_mask, NULL); //block SIGCONT (inherited by all threads)
//THREAD POOL
pthread_t thread[THREADS];
for(long long coreOffset = 0; coreOffset < THREADS; coreOffset++)
pthread_create(&thread[coreOffset], NULL, &worker_thread, (void*)coreOffset);
//NETWORKING LESTENING SOCKET
int listen_sd, new_sd, opt = 1;
struct sockaddr_in address;
int addrlen = sizeof(address);
if((listen_sd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
return 1;
if(setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
return 1;
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(SOCKET_PORT);
if(bind(listen_sd, (struct sockaddr*)&address, sizeof(address)) < 0)
return 1;
if(listen(listen_sd, 32) < 0)
return 1;
//network listen loop
for(;;) {
new_sd = accept(listen_sd, (struct sockaddr*)&address, &addrlen);
if(new_sd == -1) continue;
queue[threadPtr * QUEUE_DEPTH + qhead] = new_sd; //add new conn to queue
if(qtail[threadPtr] == qhead) //if thread's queue is empty (thread is asleep), send signal
pthread_kill(thread[threadPtr], SIGCONT);
threadPtr += 1;
//rollover incremented ptrs
qhead += (threadPtr == THREADS) ? 1 : 0;
threadPtr = (threadPtr == THREADS) ? 0 : threadPtr;
qhead = (qhead == QUEUE_DEPTH) ? 0 : qhead;
}
shutdown(listen_sd, SHUT_RDWR);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment