Skip to content

Instantly share code, notes, and snippets.

@voidexp
Created March 5, 2016 23:50
Show Gist options
  • Save voidexp/5240f64edabe9dfb40ce to your computer and use it in GitHub Desktop.
Save voidexp/5240f64edabe9dfb40ce to your computer and use it in GitHub Desktop.
TaskQueue
#include "error.h"
#include "mem.h"
#include "queue.h"
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
struct GK_Queue {
void *data;
size_t len;
size_t size;
size_t elem_size;
pthread_mutex_t mutex;
pthread_cond_t cond_has_data;
pthread_cond_t cond_full;
};
struct GK_Queue*
gk_queue_new(size_t elem_size, size_t size)
{
assert(elem_size > 0);
assert(size > 0);
struct GK_Queue *q = gk_new(struct GK_Queue);
if (!q)
return NULL;
if (!(q->data = gk_alloc0(elem_size * size))) {
gk_free(q);
return NULL;
}
q->size = size;
q->elem_size = elem_size;
if (pthread_mutex_init(&q->mutex, NULL) != 0)
goto mutex_error;
if (pthread_cond_init(&q->cond_has_data, NULL) != 0)
goto cond_error;
return q;
cond_error:
pthread_mutex_destroy(&q->mutex);
mutex_error:
gk_free(q->data);
gk_free(q);
return NULL;
}
void
gk_queue_free(struct GK_Queue *q)
{
if (q) {
pthread_cond_destroy(&q->cond_has_data);
pthread_mutex_destroy(&q->mutex);
gk_free(q->data);
gk_free(q);
}
}
void*
gk_queue_get(struct GK_Queue *q)
{
pthread_mutex_lock(&q->mutex);
void *elem = NULL;
while (q->len == 0) {
// block until the queue has some data in
struct timespec ts;
struct timeval tv;
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000 + 1000;
pthread_cond_timedwait(&q->cond_has_data, &q->mutex, &ts);
}
// pick the first element
elem = q->data;
q->len--;
// shift elements in the queue by one position
// TODO: implement this better
for (size_t i = 0, j = 1; j < q->len; i++, j++) {
memcpy(
q->data + q->elem_size * i,
q->data + q->elem_size * j,
q->elem_size
);
}
pthread_cond_broadcast(&q->cond_full);
pthread_mutex_unlock(&q->mutex);
return elem;
}
void
gk_queue_add(struct GK_Queue *q, void *elem)
{
pthread_mutex_lock(&q->mutex);
while (q->len == q->size) {
// block while the queue is full
pthread_cond_wait(&q->cond_full, &q->mutex);
}
// append the element to the end of the queue and notify
memcpy(q->data + q->elem_size * q->len++, elem, q->elem_size);
pthread_cond_broadcast(&q->cond_has_data);
pthread_mutex_unlock(&q->mutex);
}
size_t
gk_queue_len(struct GK_Queue *q)
{
size_t len;
pthread_mutex_lock(&q->mutex);
len = q->len;
pthread_mutex_unlock(&q->mutex);
return len;
}
#include "mem.h"
#include "queue.h"
#include "task_queue.h"
#include <assert.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
struct Worker {
pthread_t thread;
bool initialized;
struct GK_TaskQueue *queue;
};
struct Task {
GK_TaskFunc func;
void *data;
};
struct GK_TaskQueue {
struct GK_Queue *tasks;
struct Worker *workers;
unsigned num_workers;
size_t num_running;
pthread_mutex_t mutex;
pthread_cond_t cond_running;
};
static struct Task*
get_task(struct GK_TaskQueue *tq)
{
struct Task *tsk = gk_queue_get(tq->tasks);
pthread_mutex_lock(&tq->mutex);
tq->num_running++;
pthread_mutex_unlock(&tq->mutex);
return tsk;
}
static void
task_done(struct GK_TaskQueue *tq)
{
pthread_mutex_lock(&tq->mutex);
tq->num_running--;
pthread_cond_signal(&tq->cond_running);
pthread_mutex_unlock(&tq->mutex);
}
static void*
worker(void *arg)
{
struct Worker *wrk = arg;
bool run = true;
while (run) {
printf("worker %p waits for task\n", wrk->thread);
struct Task *tsk = get_task(wrk->queue);
printf("worker %p started task\n", wrk->thread);
if (tsk && tsk->func) {
// TODO: handle return code
tsk->func(tsk->data);
} else {
run = false;
}
task_done(wrk->queue);
printf("worker %p finished task\n", wrk->thread);
}
return NULL;
}
struct GK_TaskQueue*
gk_task_queue_new(size_t size, unsigned num_workers)
{
assert(size > 0);
assert(num_workers > 0);
struct GK_TaskQueue *tq = gk_new(struct GK_TaskQueue);
if (!tq)
return NULL;
if (pthread_mutex_init(&tq->mutex, NULL) != 0)
goto mutex_error;
if (pthread_cond_init(&tq->cond_running, NULL) != 0)
goto cond_error;
tq->tasks = gk_queue_new(sizeof(struct Task), size);
if (!tq->tasks)
goto error;
tq->num_workers = num_workers;
tq->workers = gk_alloc0(sizeof(struct Worker) * num_workers);
if (!tq->workers)
goto error;
// spawn worker threads
for (unsigned i = 0; i < num_workers; i++) {
struct Worker *wrk = &tq->workers[i];
wrk->queue = tq;
if (pthread_create(&wrk->thread, NULL, worker, wrk) != 0)
goto error;
wrk->initialized = true;
}
return tq;
error:
gk_task_queue_free(tq);
return NULL;
cond_error:
pthread_mutex_destroy(&tq->mutex);
mutex_error:
gk_free(tq);
return NULL;
}
void
gk_task_queue_free(struct GK_TaskQueue *tq)
{
if (tq) {
if (tq->workers) {
gk_task_queue_wait(tq);
for (unsigned i = 0; i < tq->num_workers; i++) {
gk_task_queue_add(tq, NULL, NULL);
}
for (unsigned i = 0; i < tq->num_workers; i++) {
struct Worker *wrk = &tq->workers[i];
if (wrk->initialized) {
pthread_join(wrk->thread, NULL);
}
}
gk_free(tq->workers);
}
pthread_mutex_destroy(&tq->mutex);
pthread_cond_destroy(&tq->cond_running);
gk_queue_free(tq->tasks);
gk_free(tq);
}
}
int
gk_task_queue_add(struct GK_TaskQueue *tq, GK_TaskFunc f, void *data)
{
struct Task task = {
.func = f,
.data = data
};
gk_queue_add(tq->tasks, &task);
return 1;
}
int
gk_task_queue_wait(struct GK_TaskQueue *tq)
{
pthread_mutex_lock(&tq->mutex);
while (tq->num_running > 0 || gk_queue_len(tq->tasks) > 0)
pthread_cond_wait(&tq->cond_running, &tq->mutex);
pthread_mutex_unlock(&tq->mutex);
return 1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment