Skip to content

Instantly share code, notes, and snippets.

@brokenprogrammer
Last active June 26, 2022 06:25
Show Gist options
  • Select an option

  • Save brokenprogrammer/5f69104f114febd904cc4b1ede60a625 to your computer and use it in GitHub Desktop.

Select an option

Save brokenprogrammer/5f69104f114febd904cc4b1ede60a625 to your computer and use it in GitHub Desktop.
bool
_IsEmpty(threadpool* Threadpool)
{
return ((Threadpool->WorkWriteIndex - Threadpool->WorkReadIndex) == 0);
}
bool
_HasWorkInQueue(threadpool* Threadpool)
{
return ((Threadpool->WorkWriteIndex - Threadpool->WorkReadIndex) != 0);
}
typedef struct
{
threadpool_work_function Function;
void *Argument;
bool Success;
} threadpool_work_copy;
// NOTE(Oskar): Helper function to pull the next work item from the queue.
threadpool_work_copy
_ThreadpoolGetWork(threadpool *Threadpool)
{
threadpool_work_copy Work = { 0 };
Work.Success = false;
if (Threadpool == NULL)
{
return Work;
}
if ((Threadpool->WorkWriteIndex - Threadpool->WorkReadIndex) == 0)
{
return Work;
}
uint32_t OriginalNextEntryToRead = Threadpool->WorkReadIndex;
uint32_t NewNextEntryToRead = (OriginalNextEntryToRead + 1) % Threadpool->Size;
if(OriginalNextEntryToRead != Threadpool->WorkWriteIndex)
{
threadpool_work *WorkPtr = Threadpool->Work + Threadpool->WorkReadIndex;
Work.Function = WorkPtr->Function;
Work.Argument = WorkPtr->Argument;
Work.Success = true;
Threadpool->WorkReadIndex = NewNextEntryToRead;
}
return Work;
}
// NOTE(Oskar): Thread worker function.
void *
_ThreadpoolWorker(void *Argument)
{
threadpool *Threadpool = Argument;
threadpool_work_copy Work;
while (1)
{
// NOTE(Oskar): Lock to prevent anything else to use threadpool members.
pthread_mutex_lock(&Threadpool->WorkMutex);
// NOTE(Oskar): Check if there is work available. If not we wait in conditional
// cond_wait unlocks the mutex and upon signal reaquires the lock.
while (!Threadpool->Stop &&
_IsEmpty(Threadpool))
{
pthread_cond_wait(&Threadpool->WorkCondition, &Threadpool->WorkMutex);
}
// NOTE(Oskar): Thread was signalled to stop. Not unlocking mutex untill
// end of function since we want to manipulate variables.
if (Threadpool->Stop)
{
break;
}
// NOTE(Oskar): Pull work from queue and increment working count.
// WorkingCount tells the pool that there are threads working.
// Unlock mutex so other threads can work as well.
Work = _ThreadpoolGetWork(Threadpool);
Threadpool->WorkingCount++;
pthread_mutex_unlock(&Threadpool->WorkMutex);
// NOTE(Oskar): If there was work then we process it.
if (Work.Success)
{
Work.Function(Work.Argument);
}
// NOTE(Oskar): Finally lock mutex again and decrement working count.
// If no threads are working and no items in queue then we signal the wait
// function to wake up.
pthread_mutex_lock(&Threadpool->WorkMutex);
Threadpool->WorkingCount--;
if (!Threadpool->Stop &&
Threadpool->WorkingCount == 0 &&
_IsEmpty(Threadpool))
{
pthread_cond_signal(&Threadpool->WorkingCondition);
}
pthread_mutex_unlock(&Threadpool->WorkMutex);
}
// NOTE(Oskar): If thread was told to stop we decrement thread count and
// signal the waiting function that a thread has exited.
Threadpool->ThreadCount--;
pthread_cond_signal(&Threadpool->WorkingCondition);
pthread_mutex_unlock(&Threadpool->WorkMutex);
return NULL;
}
threadpool *
ThreadpoolCreate(size_t NumberOfThreads, size_t WorkSize)
{
threadpool *Threadpool;
pthread_t Thread;
// NOTE(Oskar): Default to 2 threads if no number was specified.
if (NumberOfThreads == 0)
{
NumberOfThreads = 2;
}
Threadpool = calloc(1, sizeof(threadpool));
Threadpool->Work = calloc(WorkSize, sizeof(threadpool_work));
Threadpool->ThreadCount = NumberOfThreads;
Threadpool->Size = WorkSize;
pthread_mutex_init(&Threadpool->WorkMutex, NULL);
pthread_cond_init(&Threadpool->WorkCondition, NULL);
pthread_cond_init(&Threadpool->WorkingCondition, NULL);
Threadpool->WorkReadIndex = 0;
Threadpool->WorkWriteIndex = 0;
// NOTE(Oskar): Create number of threads. Threadpool worker is specified as
// working function and threads are detached so that they will be cleaned up on exit.
for (size_t Index = 0; Index < NumberOfThreads; ++Index)
{
pthread_create(&Thread, NULL, _ThreadpoolWorker, Threadpool);
pthread_detach(Thread);
}
return Threadpool;
}
void
ThreadpoolDelete(threadpool *Threadpool)
{
if (Threadpool == NULL)
{
return;
}
pthread_mutex_lock(&Threadpool->WorkMutex);
// NOTE(Oskar): Tell threads they need to stop.
Threadpool->Stop = true;
pthread_cond_broadcast(&Threadpool->WorkCondition);
pthread_mutex_unlock(&Threadpool->WorkMutex);
// NOTE(Oskar): Wait for threads already in the proces of working to finish.
ThreadpoolWait(Threadpool);
pthread_mutex_destroy(&Threadpool->WorkMutex);
pthread_cond_destroy(&Threadpool->WorkCondition);
pthread_cond_destroy(&Threadpool->WorkingCondition);
free(Threadpool->Work);
free(Threadpool);
}
bool
ThreadpoolAddWork(threadpool *Threadpool, threadpool_work_function Function, void *Argument)
{
threadpool_work *Work;
if (Threadpool == NULL)
{
return false;
}
pthread_mutex_lock(&Threadpool->WorkMutex);
uint32_t NewNextEntryToWrite = (Threadpool->WorkWriteIndex + 1) % Threadpool->Size;
// assert(NewNextEntryToWrite != Threadpool->WorkReadIndex);
// if (NewNextEntryToWrite == Threadpool->WorkReadIndex)
if (NewNextEntryToWrite == Threadpool->WorkReadIndex)
{
pthread_mutex_unlock(&Threadpool->WorkMutex);
return false;
}
Work = Threadpool->Work + Threadpool->WorkWriteIndex;
Work->Argument = Argument;
Work->Function = Function;
Threadpool->WorkWriteIndex = NewNextEntryToWrite;
pthread_cond_broadcast(&Threadpool->WorkCondition);
pthread_mutex_unlock(&Threadpool->WorkMutex);
return true;
}
// NOTE(Oskar): Blocking function that will only return when there is no work
// left to be done.
void
ThreadpoolWait(threadpool *Threadpool)
{
if (Threadpool == NULL)
{
return;
}
pthread_mutex_lock(&Threadpool->WorkMutex);
while (1)
{
if ((!Threadpool->Stop && Threadpool->WorkingCount != 0) ||
(Threadpool->Stop && Threadpool->ThreadCount != 0) ||
_HasWorkInQueue(Threadpool))
{
pthread_cond_wait(&Threadpool->WorkingCondition, &Threadpool->WorkMutex);
}
else
{
break;
}
}
pthread_mutex_unlock(&Threadpool->WorkMutex);
}
#include <stdbool.h>
#include <stdint.h>
typedef void (*threadpool_work_function)(void *arg);
typedef struct
{
threadpool_work_function Function;
void *Argument;
} threadpool_work;
typedef struct
{
threadpool_work *Work;
size_t Size;
uint32_t volatile WorkReadIndex;
uint32_t volatile WorkWriteIndex;
pthread_mutex_t WorkMutex;
pthread_cond_t WorkCondition;
pthread_cond_t WorkingCondition;
size_t WorkingCount;
size_t ThreadCount;
bool Stop;
} threadpool;
threadpool *ThreadpoolCreate(size_t NumberOfThreads, size_t WorkSize);
void ThreadpoolDelete(threadpool *Threadpool);
bool ThreadpoolAddWork(threadpool *Threadpool, threadpool_work_function Function, void *Argument);
void ThreadpoolWait(threadpool *Threadpool);
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "om_threadpool.h"
#include "om_threadpool.c"
static const size_t num_threads = 10;
static const size_t num_items = 999;
void worker(void *arg)
{
int *val = arg;
*val = *val + 1000;
usleep(500);
}
int main()
{
threadpool *tm;
int *vals;
size_t i;
tm = ThreadpoolCreate(num_threads, 500);
printf("Created tpool\n");
vals = calloc(num_items, sizeof(*vals));
printf("Created vals\n");
size_t Stop = 0;
for (i=0; i<num_items; i++) {
vals[i] = i;
if (!ThreadpoolAddWork(tm, worker, vals+i))
{
Stop = i;
printf("FAILED TO ADD WORK! %ld\n", i);
break;
}
}
printf("added work\n");
ThreadpoolWait(tm);
for (i=0; i<Stop; i++)
{
printf("%d\n", vals[i]);
assert(vals[i] == (i + 1000));
}
free(vals);
ThreadpoolDelete(tm);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment