Skip to content

Instantly share code, notes, and snippets.

@enfiskutensykkel
Created December 5, 2023 19:48
Show Gist options
  • Save enfiskutensykkel/9e19f9473acba0a60cb8700d2ab24d25 to your computer and use it in GitHub Desktop.
Save enfiskutensykkel/9e19f9473acba0a60cb8700d2ab24d25 to your computer and use it in GitHub Desktop.
Thread pool example
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#define NUM_WORKERS 8
#define NUM_JOBS 400
#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
struct queue_entry {
struct queue_entry *next;
struct queue_entry *prev;
};
struct job {
void (*func)(void*);
void *arg;
struct queue_entry entry;
};
bool stop_threads = false;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
struct queue_entry head = {
.next = &head,
.prev = &head
};
void enqueue(struct job *job)
{
struct queue_entry *entry = &job->entry;
entry->next = &head;
pthread_mutex_lock(&mtx);
entry->prev = head.prev;
head.prev->next = entry;
head.prev = entry;
pthread_cond_signal(&cv);
pthread_mutex_unlock(&mtx);
}
struct job * dequeue()
{
struct queue_entry *entry;
pthread_mutex_lock(&mtx);
while (head.next == &head && !stop_threads) {
pthread_cond_wait(&cv, &mtx);
}
if (stop_threads) {
pthread_mutex_unlock(&mtx);
return NULL;
}
entry = head.next;
head.next = entry->next;
entry->next->prev = &head;
entry->next = NULL;
entry->prev = NULL;
pthread_mutex_unlock(&mtx);
return container_of(entry, struct job, entry);
}
void * worker_thread()
{
while (true) {
struct job *job = dequeue();
if (job == NULL) {
return NULL;
}
job->func(job->arg);
free(job);
}
}
void call_later(void (*func)(void*), void *arg)
{
struct job *job = malloc(sizeof(struct job));
if (job != NULL) {
job->func = func;
job->arg = arg;
enqueue(job);
}
}
void graceful_shutdown()
{
pthread_mutex_lock(&mtx);
stop_threads = true;
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&mtx);
}
struct foo_arg {
int value;
};
void foo(void *arg)
{
struct foo_arg *data = (struct foo_arg*) arg;
/* pretend to do some heavy work */
usleep(5000);
printf("Hello world! %d\n", data->value);
}
int main()
{
int i;
struct foo_arg *args;
pthread_t workers[NUM_WORKERS];
signal(SIGINT, (void (*)(int)) graceful_shutdown);
args = calloc(sizeof(struct foo_arg), NUM_JOBS);
if (args == NULL) {
return 2;
}
for (i = 0; i < NUM_JOBS; ++i) {
args[i].value = i;
}
for (i = 0; i < NUM_WORKERS; ++i) {
pthread_create(&workers[i], NULL, (void * (*)(void*)) worker_thread, NULL);
}
for (i = 0; i < NUM_JOBS; ++i) {
call_later(foo, &args[i]);
usleep(500);
}
printf("Done scheduling jobs\n");
for (i = 0; i < NUM_WORKERS; ++i) {
pthread_join(workers[i], NULL);
}
free(args);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment