Skip to content

Instantly share code, notes, and snippets.

@rlapz
Last active June 17, 2024 19:28
Show Gist options
  • Save rlapz/0a4138e0b5be7d62e8ebce96c78067d2 to your computer and use it in GitHub Desktop.
Save rlapz/0a4138e0b5be7d62e8ebce96c78067d2 to your computer and use it in GitHub Desktop.
Generic Thread Pool
#include <stdlib.h>
#include "thrd_pool.h"
typedef struct thrd_pool_job {
ThrdPoolFn func;
void *udata;
struct thrd_pool_job *next;
} ThrdPoolJob;
static int _create_threads(ThrdPool *t);
static int _jobs_init(ThrdPool *t, unsigned init_size, unsigned max_size);
static void _jobs_deinit(ThrdPool *t);
static int _jobs_enqueue(ThrdPool *t, ThrdPoolFn func, void *udata);
static ThrdPoolJob *_jobs_dequeue(ThrdPool *t);
static void _jobs_pool_push(ThrdPool *t, ThrdPoolJob *job);
static ThrdPoolJob *_jobs_pool_pop(ThrdPool *t);
static void _jobs_pool_destroy(ThrdPool *t);
static int _worker_fn(void *udata);
/*
* Public
*/
int
thrd_pool_create(ThrdPool *t, unsigned thrd_size, unsigned jobs_init_size, unsigned jobs_max_size)
{
if (_jobs_init(t, jobs_init_size, jobs_max_size) < 0)
return -1;
if (mtx_init(&t->mtx_general, mtx_plain) != 0)
goto err0;
if (cnd_init(&t->cond_job) != 0)
goto err1;
t->is_alive = 0;
t->workers_len = (thrd_size == 0)? 2 : thrd_size;
t->threads = malloc(sizeof(thrd_t) * t->workers_len);
if (t->threads == NULL)
goto err2;
if (_create_threads(t) < 0)
goto err3;
return 0;
err3:
free(t->threads);
err2:
cnd_destroy(&t->cond_job);
err1:
mtx_destroy(&t->mtx_general);
err0:
_jobs_deinit(t);
return -1;
}
void
thrd_pool_destroy(ThrdPool *t)
{
if (t->threads != NULL) {
for (unsigned i = 0; i < t->workers_len; i++)
thrd_join(t->threads[i], NULL);
free(t->threads);
}
_jobs_deinit(t);
cnd_destroy(&t->cond_job);
mtx_destroy(&t->mtx_general);
}
void
thrd_pool_stop(ThrdPool *t)
{
mtx_lock(&t->mtx_general); /* LOCK */
t->is_alive = 0;
cnd_broadcast(&t->cond_job);
mtx_unlock(&t->mtx_general); /* UNLOCK */
}
int
thrd_pool_add_job(ThrdPool *t, ThrdPoolFn func, void *udata)
{
int ret = -1;
mtx_lock(&t->mtx_general); /* LOCK */
ret = _jobs_enqueue(t, func, udata);
if (ret < 0)
goto out0;
cnd_signal(&t->cond_job);
out0:
mtx_unlock(&t->mtx_general); /* UNLOCK */
return ret;
}
/*
* Private
*/
static int
_create_threads(ThrdPool *t)
{
t->is_alive = 1;
unsigned iter = 0;
while (iter < t->workers_len) {
if (thrd_create(&t->threads[iter], _worker_fn, t) != 0)
goto err0;
iter++;
}
return 0;
err0:
thrd_pool_stop(t);
while (iter--)
thrd_join(t->threads[iter], NULL);
return -1;
}
static int
_jobs_init(ThrdPool *t, unsigned init_size, unsigned max_size)
{
if (init_size > max_size)
return -1;
t->jobs_pool = NULL;
for (unsigned i = 0; i < init_size; i++) {
ThrdPoolJob *const job = malloc(sizeof(ThrdPoolJob));
if (job == NULL) {
_jobs_pool_destroy(t);
return -1;
}
_jobs_pool_push(t, job);
}
t->jobs_len = 0;
t->jobs_cap = init_size;
t->jobs_max = max_size;
t->job_first = NULL;
t->job_last = NULL;
return 0;
}
static void
_jobs_deinit(ThrdPool *t)
{
ThrdPoolJob *job;
while ((job = _jobs_dequeue(t)) != NULL);
_jobs_pool_destroy(t);
t->job_first = NULL;
t->job_last = NULL;
}
static int
_jobs_enqueue(ThrdPool *t, ThrdPoolFn func, void *udata)
{
/* reuse allocated memory if any */
ThrdPoolJob *job = _jobs_pool_pop(t);
if (job == NULL) {
if (t->jobs_cap > t->jobs_max)
return -1;
job = malloc(sizeof(ThrdPoolJob));
if (job == NULL)
return -1;
t->jobs_cap++;
}
if (t->job_last == NULL)
t->job_first = job;
else
t->job_last->next = job;
job->func = func;
job->udata = udata;
job->next = NULL;
t->job_last = job;
t->jobs_len++;
return 0;
}
static ThrdPoolJob *
_jobs_dequeue(ThrdPool *t)
{
ThrdPoolJob *const job = t->job_first;
if ((job == NULL) || (t->jobs_len == 0))
return NULL;
t->job_first = job->next;
if (t->job_first == NULL)
t->job_last = NULL;
_jobs_pool_push(t, job);
t->jobs_len--;
return job;
}
static inline void
_jobs_pool_push(ThrdPool *t, ThrdPoolJob *job)
{
job->next = t->jobs_pool;
t->jobs_pool = job;
}
static inline ThrdPoolJob *
_jobs_pool_pop(ThrdPool *t)
{
ThrdPoolJob *const job = t->jobs_pool;
if (job != NULL)
t->jobs_pool = job->next;
return job;
}
static void
_jobs_pool_destroy(ThrdPool *t)
{
ThrdPoolJob *job;
while ((job = _jobs_pool_pop(t)) != NULL)
free(job);
t->jobs_pool = NULL;
}
static int
_worker_fn(void *udata)
{
ThrdPool *const t = (ThrdPool *)udata;
ThrdPoolJob *job;
ThrdPoolFn tmp_func;
void *tmp_udata;
mtx_lock(&t->mtx_general); /* LOCK */
while (t->is_alive) {
while ((job = _jobs_dequeue(t)) == NULL) {
cnd_wait(&t->cond_job, &t->mtx_general);
if (t->is_alive == 0)
goto out0;
}
/* copy job's data to prevent being reused (clobbered) by other thread */
tmp_func = job->func;
tmp_udata = job->udata;
mtx_unlock(&t->mtx_general); /* UNLOCK */
tmp_func(tmp_udata);
mtx_lock(&t->mtx_general); /* LOCK */
cnd_signal(&t->cond_job);
}
out0:
mtx_unlock(&t->mtx_general); /* UNLOCK */
return 0;
}
#ifndef __THRD_POOL_H__
#define __THRD_POOL_H__
#include <threads.h>
typedef void (*ThrdPoolFn) (void *udata);
typedef struct thrd_pool_job ThrdPoolJob;
typedef struct thrd_pool {
volatile int is_alive;
ThrdPoolJob *job_first;
ThrdPoolJob *job_last;
ThrdPoolJob *jobs_pool;
unsigned jobs_len;
unsigned jobs_cap;
unsigned jobs_max;
unsigned workers_len;
cnd_t cond_job;
mtx_t mtx_general;
thrd_t *threads;
} ThrdPool;
int thrd_pool_create(ThrdPool *t, unsigned thrd_size, unsigned jobs_init_size,
unsigned jobs_max_size);
void thrd_pool_destroy(ThrdPool *t);
/*
* force stop: ignore pending job(s)
*/
void thrd_pool_stop(ThrdPool *t);
int thrd_pool_add_job(ThrdPool *t, ThrdPoolFn func, void *udata);
#endif
#include <stdio.h>
#include <unistd.h>
#include "thrd_pool.h"
static void
test0(void *udata)
{
printf("%zu\n", (size_t)udata);
sleep(2);
}
int
main(void)
{
ThrdPool tp;
if (thrd_pool_create(&tp, 8, 4, 1000) < 0)
return 1;
for (size_t i = 0; i < 10; i++)
thrd_pool_add_job(&tp, test0, (void *)i);
sleep(20);
thrd_pool_stop(&tp);
thrd_pool_destroy(&tp);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment