Skip to content

Instantly share code, notes, and snippets.

@ofx ofx/threadpool.c
Created Aug 18, 2018

Embed
What would you like to do?
Old threadpool implementation (pthread)
//
// threadpool.c
// threadpool
//
#include "threadpool.h"
static void *sweep(void *voidtask)
{
task *task = voidtask;
task->r = 1;
task->f(task->ndat, task->dat);
return 0;
}
static void *schedule(void *voidPool)
{
task *ta;
threadpool *pool = (threadpool*) voidPool;
for(;;)
{
time_t now = time(0);
// While scheduling, lock push and pop
pthread_mutex_lock(&pool->i);
{
pthread_mutex_lock(&pool->d);
{
// Traverse the linked list and check for tasks exceeding time
for (ta = pool->h ; ta != 0 ; ta = (task*) ta->n)
{
// Running and deadline is not neglected, deadline exceeded
if ((!ta->r && ta->d != -1) && ((now - ta->t) >= ta->d))
{
if (ta == pool->h)
{
pool->h = (task*) ta->n;
if (ta->n != 0)
{
pool->h->p = 0;
}
}
else
{
((task*) ta->p)->n = ta->n;
if (ta->n != 0)
{
ta->n = ta->p;
}
}
pthread_t id;
pthread_create(&id, 0, sweep, (void*) ta);
// Meep
printf("Deadline exceeded, assigning %d to %d...\n", ta->i, (int) id);
}
}
}
pthread_mutex_unlock(&pool->i);
}
pthread_mutex_unlock(&pool->d);
// Sleep for a second
sleep(1);
}
}
static void *work(void *voidPool)
{
threadpool *pool = (threadpool*) voidPool;
for (;;)
{
sweep((void*) threadpool(pool));
}
}
static int get_num_cpus(void)
{
int c = 0;
#ifdef __APPLE__
int retstat;
size_t len = sizeof(c);
int request[2] = {
CTL_HW,
HW_NCPU
};
retstat = sysctl(request, 2, &c, &len, NULL, 0);
if (retstat < 0)
{
perror("sysctl HW_NCPU failed");
exit( EXIT_FAILURE );
}
#elif defined __linux__
c = 0; // :'(
#elif defined _WIN32 || defined _WIN64
c = 0; // :'(
#endif
return c;
}
task *threadpool_pop(threadpool *pool)
{
task *task = 0;
// Lock the pop operation
pthread_mutex_lock(&pool->d);
{
// Wait as long as we don't have a head
if (pool->h == 0)
{
pthread_cond_wait(&pool->e, &pool->d);
}
// Lock the push operation
pthread_mutex_lock(&pool->i);
{
task = pool->h;
pool->h = task->n == 0 ? 0 : task->n;
task->n = task->p = 0;
}
pthread_mutex_unlock(&pool->i);
}
pthread_mutex_unlock(&pool->d);
return task;
}
void threadpool_push(threadpool *pool, task *t)
{
task *ta;
t->t = time(t->n = t->p = t->r = 0);
pthread_mutex_lock(&pool->i);
{
if (pool->h == 0)
{
pool->h = t;
}
else
{
ta = pool->h;
while (ta->n != 0)
{
ta = (task*) ta->n;
}
ta->n = (struct task*) t;
t->p = (struct task*) ta;
}
pthread_cond_signal(&pool->e);
}
pthread_mutex_unlock(&pool->i);
}
int threadpool_free(threadpool *pool)
{
// Meep, meep, meep
int i = 0;
for (; i < pool->n ; ++i)
{
free(*pool->w++);
}
free(pool->s);
free(pool->w);
free(pool);
return 0;
}
threadpool *threadpool_new(const int n)
{
int i;
threadpool *pool = malloc(sizeof(threadpool));
pool->n = n == 0 ? get_num_cpus() * 2 + 1 : n;
pool->s = pool->h = 0;
pthread_mutex_init(&pool->d, 0);
pthread_mutex_init(&pool->i, 0);
pthread_cond_init(&pool->e, 0);
pthread_create(&pool->s, 0, &schedule, (void*)pool);
pool->w = malloc(sizeof(work) * pool->n);
for (i = 0 ; i < pool->n ; ++i)
{
pthread_create(&pool->w[i], 0, &work, (void*)pool);
}
return pool;
}
void threadpool_join(threadpool *pool)
{
int i;
for (i = 0 ; i < pool->n ; ++i)
{
pthread_join(pool->w[i], 0);
}
}
//
// threadpool.h
// threadpool
//
#ifndef threadpool_h
#define threadpool_h
#include <stdio.h>
#include <time.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>
#ifdef __APPLE__
#include <sys/sysctl.h>
#include <sys/types.h>
#include <unistd.h>
#elif defined __linux__
// :'(
#elif defined _WIN32 || defined _WIN64
// :'(
#endif
// Structures
typedef struct
{
time_t t; // Time
time_t d; // Deadline
int r; // Running
int i; // Identifier
struct task *n; // Next in list
struct task *p; // Prev in list
int ndat; // Data length
void *dat; // Data
void (*f)(int, void*); // Function
} task;
typedef struct
{
int n; // Num workers
pthread_t *w; // Workers
pthread_mutex_t d; // Decrement
pthread_mutex_t i; // Increment
task *h; // Head
pthread_t s; // Scheduler
pthread_cond_t e; // Empty
} threadpool;
// Inner workings
static void *sweep(void *);
static void *schedule(void *);
static void *work(void *);
static int get_num_cpus(void);
// Threadpool operations
task *threadpool_pop(threadpool *);
void threadpool_join(threadpool *);
void threadpool_push(threadpool *, task *);
// Construct/destruct
threadpool *threadpool_new(const int);
int threadpool_free(threadpool *);
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.