Last active
June 17, 2024 19:28
-
-
Save rlapz/0a4138e0b5be7d62e8ebce96c78067d2 to your computer and use it in GitHub Desktop.
Generic Thread Pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include "thrd_pool.h" | |
typedef struct thrd_pool_job { | |
ThrdPoolFn func; | |
void *udata; | |
struct thrd_pool_job *next; | |
} ThrdPoolJob; | |
static int _create_threads(ThrdPool *t); | |
static int _jobs_init(ThrdPool *t, unsigned init_size, unsigned max_size); | |
static void _jobs_deinit(ThrdPool *t); | |
static int _jobs_enqueue(ThrdPool *t, ThrdPoolFn func, void *udata); | |
static ThrdPoolJob *_jobs_dequeue(ThrdPool *t); | |
static void _jobs_pool_push(ThrdPool *t, ThrdPoolJob *job); | |
static ThrdPoolJob *_jobs_pool_pop(ThrdPool *t); | |
static void _jobs_pool_destroy(ThrdPool *t); | |
static int _worker_fn(void *udata); | |
/* | |
* Public | |
*/ | |
int | |
thrd_pool_create(ThrdPool *t, unsigned thrd_size, unsigned jobs_init_size, unsigned jobs_max_size) | |
{ | |
if (_jobs_init(t, jobs_init_size, jobs_max_size) < 0) | |
return -1; | |
if (mtx_init(&t->mtx_general, mtx_plain) != 0) | |
goto err0; | |
if (cnd_init(&t->cond_job) != 0) | |
goto err1; | |
t->is_alive = 0; | |
t->workers_len = (thrd_size == 0)? 2 : thrd_size; | |
t->threads = malloc(sizeof(thrd_t) * t->workers_len); | |
if (t->threads == NULL) | |
goto err2; | |
if (_create_threads(t) < 0) | |
goto err3; | |
return 0; | |
err3: | |
free(t->threads); | |
err2: | |
cnd_destroy(&t->cond_job); | |
err1: | |
mtx_destroy(&t->mtx_general); | |
err0: | |
_jobs_deinit(t); | |
return -1; | |
} | |
void | |
thrd_pool_destroy(ThrdPool *t) | |
{ | |
if (t->threads != NULL) { | |
for (unsigned i = 0; i < t->workers_len; i++) | |
thrd_join(t->threads[i], NULL); | |
free(t->threads); | |
} | |
_jobs_deinit(t); | |
cnd_destroy(&t->cond_job); | |
mtx_destroy(&t->mtx_general); | |
} | |
void | |
thrd_pool_stop(ThrdPool *t) | |
{ | |
mtx_lock(&t->mtx_general); /* LOCK */ | |
t->is_alive = 0; | |
cnd_broadcast(&t->cond_job); | |
mtx_unlock(&t->mtx_general); /* UNLOCK */ | |
} | |
int | |
thrd_pool_add_job(ThrdPool *t, ThrdPoolFn func, void *udata) | |
{ | |
int ret = -1; | |
mtx_lock(&t->mtx_general); /* LOCK */ | |
ret = _jobs_enqueue(t, func, udata); | |
if (ret < 0) | |
goto out0; | |
cnd_signal(&t->cond_job); | |
out0: | |
mtx_unlock(&t->mtx_general); /* UNLOCK */ | |
return ret; | |
} | |
/* | |
* Private | |
*/ | |
static int | |
_create_threads(ThrdPool *t) | |
{ | |
t->is_alive = 1; | |
unsigned iter = 0; | |
while (iter < t->workers_len) { | |
if (thrd_create(&t->threads[iter], _worker_fn, t) != 0) | |
goto err0; | |
iter++; | |
} | |
return 0; | |
err0: | |
thrd_pool_stop(t); | |
while (iter--) | |
thrd_join(t->threads[iter], NULL); | |
return -1; | |
} | |
static int | |
_jobs_init(ThrdPool *t, unsigned init_size, unsigned max_size) | |
{ | |
if (init_size > max_size) | |
return -1; | |
t->jobs_pool = NULL; | |
for (unsigned i = 0; i < init_size; i++) { | |
ThrdPoolJob *const job = malloc(sizeof(ThrdPoolJob)); | |
if (job == NULL) { | |
_jobs_pool_destroy(t); | |
return -1; | |
} | |
_jobs_pool_push(t, job); | |
} | |
t->jobs_len = 0; | |
t->jobs_cap = init_size; | |
t->jobs_max = max_size; | |
t->job_first = NULL; | |
t->job_last = NULL; | |
return 0; | |
} | |
static void | |
_jobs_deinit(ThrdPool *t) | |
{ | |
ThrdPoolJob *job; | |
while ((job = _jobs_dequeue(t)) != NULL); | |
_jobs_pool_destroy(t); | |
t->job_first = NULL; | |
t->job_last = NULL; | |
} | |
static int | |
_jobs_enqueue(ThrdPool *t, ThrdPoolFn func, void *udata) | |
{ | |
/* reuse allocated memory if any */ | |
ThrdPoolJob *job = _jobs_pool_pop(t); | |
if (job == NULL) { | |
if (t->jobs_cap > t->jobs_max) | |
return -1; | |
job = malloc(sizeof(ThrdPoolJob)); | |
if (job == NULL) | |
return -1; | |
t->jobs_cap++; | |
} | |
if (t->job_last == NULL) | |
t->job_first = job; | |
else | |
t->job_last->next = job; | |
job->func = func; | |
job->udata = udata; | |
job->next = NULL; | |
t->job_last = job; | |
t->jobs_len++; | |
return 0; | |
} | |
static ThrdPoolJob * | |
_jobs_dequeue(ThrdPool *t) | |
{ | |
ThrdPoolJob *const job = t->job_first; | |
if ((job == NULL) || (t->jobs_len == 0)) | |
return NULL; | |
t->job_first = job->next; | |
if (t->job_first == NULL) | |
t->job_last = NULL; | |
_jobs_pool_push(t, job); | |
t->jobs_len--; | |
return job; | |
} | |
static inline void | |
_jobs_pool_push(ThrdPool *t, ThrdPoolJob *job) | |
{ | |
job->next = t->jobs_pool; | |
t->jobs_pool = job; | |
} | |
static inline ThrdPoolJob * | |
_jobs_pool_pop(ThrdPool *t) | |
{ | |
ThrdPoolJob *const job = t->jobs_pool; | |
if (job != NULL) | |
t->jobs_pool = job->next; | |
return job; | |
} | |
static void | |
_jobs_pool_destroy(ThrdPool *t) | |
{ | |
ThrdPoolJob *job; | |
while ((job = _jobs_pool_pop(t)) != NULL) | |
free(job); | |
t->jobs_pool = NULL; | |
} | |
static int | |
_worker_fn(void *udata) | |
{ | |
ThrdPool *const t = (ThrdPool *)udata; | |
ThrdPoolJob *job; | |
ThrdPoolFn tmp_func; | |
void *tmp_udata; | |
mtx_lock(&t->mtx_general); /* LOCK */ | |
while (t->is_alive) { | |
while ((job = _jobs_dequeue(t)) == NULL) { | |
cnd_wait(&t->cond_job, &t->mtx_general); | |
if (t->is_alive == 0) | |
goto out0; | |
} | |
/* copy job's data to prevent being reused (clobbered) by other thread */ | |
tmp_func = job->func; | |
tmp_udata = job->udata; | |
mtx_unlock(&t->mtx_general); /* UNLOCK */ | |
tmp_func(tmp_udata); | |
mtx_lock(&t->mtx_general); /* LOCK */ | |
cnd_signal(&t->cond_job); | |
} | |
out0: | |
mtx_unlock(&t->mtx_general); /* UNLOCK */ | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef __THRD_POOL_H__ | |
#define __THRD_POOL_H__ | |
#include <threads.h> | |
typedef void (*ThrdPoolFn) (void *udata); | |
typedef struct thrd_pool_job ThrdPoolJob; | |
typedef struct thrd_pool { | |
volatile int is_alive; | |
ThrdPoolJob *job_first; | |
ThrdPoolJob *job_last; | |
ThrdPoolJob *jobs_pool; | |
unsigned jobs_len; | |
unsigned jobs_cap; | |
unsigned jobs_max; | |
unsigned workers_len; | |
cnd_t cond_job; | |
mtx_t mtx_general; | |
thrd_t *threads; | |
} ThrdPool; | |
int thrd_pool_create(ThrdPool *t, unsigned thrd_size, unsigned jobs_init_size, | |
unsigned jobs_max_size); | |
void thrd_pool_destroy(ThrdPool *t); | |
/* | |
* force stop: ignore pending job(s) | |
*/ | |
void thrd_pool_stop(ThrdPool *t); | |
int thrd_pool_add_job(ThrdPool *t, ThrdPoolFn func, void *udata); | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <unistd.h> | |
#include "thrd_pool.h" | |
static void | |
test0(void *udata) | |
{ | |
printf("%zu\n", (size_t)udata); | |
sleep(2); | |
} | |
int | |
main(void) | |
{ | |
ThrdPool tp; | |
if (thrd_pool_create(&tp, 8, 4, 1000) < 0) | |
return 1; | |
for (size_t i = 0; i < 10; i++) | |
thrd_pool_add_job(&tp, test0, (void *)i); | |
sleep(20); | |
thrd_pool_stop(&tp); | |
thrd_pool_destroy(&tp); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment