Skip to content

Instantly share code, notes, and snippets.

@abstractalgo
Forked from vurtun/fibers.c
Created June 7, 2016 20:31
Show Gist options
  • Save abstractalgo/ed14b3fe1b286710b8df0011e90daee8 to your computer and use it in GitHub Desktop.
Save abstractalgo/ed14b3fe1b286710b8df0011e90daee8 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <string.h>
#define streq(a, b) (!strcmp((a), (b)))
#ifndef __USE_GNU
#define __USE_GNU
#endif
#include <setjmp.h>
#include <sched.h>
#include <unistd.h>
#include <pthread.h>
#include "xmmintrin.h"
#include <ucontext.h>
typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;
typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;
typedef long word;
#define MAX_WORK_QUEUE_THREAD 64
#define FIBER_STACK_SIZE (64 * 1024)
#define MAX_SCHEDULER_WORKER 64
#define MAX_SCHEDULER_FIBERS 128
/* --------------------------------------------------------------
*
* SPINLOCK
*
* --------------------------------------------------------------*/
static void
spinlock_begin(volatile u32 *spinlock)
{while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0);}
static void
spinlock_end(volatile u32 *spinlock)
{_mm_sfence(); *spinlock = 0;}
/* --------------------------------------------------------------
*
* SEMAPHORE
*
* --------------------------------------------------------------*/
struct sem {
pthread_mutex_t guard;
pthread_cond_t cond;
int count;
};
static void
sem_init(struct sem *s, int init)
{
pthread_mutex_init(&s->guard, 0);
pthread_cond_init(&s->cond, 0);
s->count = init;
}
static void
sem_free(struct sem *s)
{
pthread_mutex_destroy(&s->guard);
pthread_cond_destroy(&s->cond);
}
static void
sem_post(struct sem *s, int delta)
{
if (delta < 0) return;
pthread_mutex_lock(&s->guard);
s->count += delta;
pthread_cond_broadcast(&s->cond);
pthread_mutex_unlock(&s->guard);
}
static void
sem_wait(struct sem *s, int delta)
{
if (delta < 0 ) return;
pthread_mutex_lock(&s->guard);
do {
if (s->count >= delta) {
s->count -= delta;
break;
}
pthread_cond_wait(&s->cond, &s->guard);
} while (1);
pthread_mutex_unlock(&s->guard);
}
/* --------------------------------------------------------------
*
* FIBER
*
* --------------------------------------------------------------*/
#undef _FORTIFY_SOURCE
typedef void(*fiber_callback)(void*);
struct sys_fiber {
ucontext_t fib;
};
static void
fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size,
fiber_callback callback, void *data)
{
getcontext(&fib->fib);
fib->fib.uc_stack.ss_size = stack_size;
fib->fib.uc_stack.ss_sp = stack;
fib->fib.uc_link = 0;
makecontext(&fib->fib, (void(*)())callback, 1, data);
}
static void
fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib)
{
swapcontext(&prev->fib, &fib->fib);
}
/* --------------------------------------------------------------
*
* QUEUE
*
* --------------------------------------------------------------*/
/* This is an implementation of a multi producer and consumer Non-Blocking
* Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs:
* www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */
#define MAX_WORK_QUEUE_JOBS (1024)
#define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1)
struct scheduler;
typedef void(*job_callback)(struct scheduler*,void*);
#define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg)
#define BASE_ALIGN(x) __attribute__((aligned(x)))
#define QUEUE_EMPTY 0
#define QUEUE_REMOVED 1
struct job {
void *data;
job_callback callback;
volatile u32 *run_count;
};
struct job_queue {
volatile u32 head;
struct job *jobs[MAX_WORK_QUEUE_JOBS];
volatile u32 tail;
};
static void
job_queue_init(struct job_queue *q)
{
memset(q, 0, sizeof(*q));
q->jobs[0] = QUEUE_EMPTY;
q->head = 0;
q->tail = 1;
}
static int
job_queue_entry_free(struct job *p)
{
return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED));
}
static int
job_queue_push(struct job_queue *q, struct job *job)
{
while (1) {
/* read tail */
u32 te = q->tail;
u32 ate = te;
struct job *tt = q->jobs[ate];
u32 tmp = (ate + 1) & WORK_QUEUE_MASK;
struct job *tnew;
/* we want to find the actual tail */
while (!(job_queue_entry_free(tt))) {
/* check tails consistency */
if (te != q->tail) goto retry;
/* check if queue is full */
if (tmp == q->head) break;
tt = q->jobs[tmp];
ate = tmp;
tmp = (ate + 1) & WORK_QUEUE_MASK;
}
/* check tails consistency */
if (te != q->tail) continue;
/* check if queue is full */
if (tmp == q->head) {
ate = (tmp + 1) & WORK_QUEUE_MASK;
tt = q->jobs[ate];
if (!(job_queue_entry_free(tt)))
return 0; /* queue is full */
/* let pop update header */
__sync_bool_compare_and_swap(&q->head, tmp, ate);
continue;
}
if ((uintptr_t)tt == QUEUE_REMOVED)
job = (struct job*)((uintptr_t)job | 0x01);
if (te != q->tail) continue;
if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) {
if ((tmp & 1) == 0)
__sync_bool_compare_and_swap(&q->tail, te, tmp);
return 1;
}
retry:;
}
}
static int
job_queue_pop(struct job **job, struct job_queue *q)
{
while (1) {
u32 th = q->head;
u32 tmp = (th + 1) & WORK_QUEUE_MASK;
struct job *tt = q->jobs[tmp];
struct job *tnull = 0;
/* we want to find the actual head */
while ((job_queue_entry_free(tt))) {
if (th != q->head) goto retry;
if (tmp == q->tail) return 0;
tmp = (tmp + 1) & WORK_QUEUE_MASK;
tt = q->jobs[tmp];
}
/* check head's consistency */
if (th != q->head) continue;
/* check if queue is empty */
if (tmp == q->tail) {
/* help push to update end */
__sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK);
continue; /* retry */
}
tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY);
if (th != q->head) continue;
/* get actual head */
if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) {
if ((tmp & 0x1) == 0)
__sync_bool_compare_and_swap(&q->head, th, tmp);
*job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1);
return 1;
}
retry:;
}
}
/* --------------------------------------------------------------
*
* SCHEDULER
*
* --------------------------------------------------------------*/
typedef volatile u32 job_counter;
enum job_queue_ids {
JOB_QUEUE_LOW,
JOB_QUEUE_NORMAL,
JOB_QUEUE_HIGH,
JOB_QUEUE_COUNT
};
struct scheduler_fiber {
struct scheduler_fiber *next;
struct scheduler_fiber *prev;
void *stack;
size_t stack_size;
struct sys_fiber handle;
struct job job;
u32 value;
};
struct scheduler_worker {
int id;
pthread_t thread;
struct scheduler_fiber *context;
struct scheduler *sched;
};
typedef void (*scheduler_profiler_callback_f)(void*, int thread_id);
struct scheduler_profiling {
void *userdata;
scheduler_profiler_callback_f thread_start;
scheduler_profiler_callback_f thread_stop;
scheduler_profiler_callback_f context_switch;
scheduler_profiler_callback_f wait_start;
scheduler_profiler_callback_f wait_stop;
};
struct scheduler {
struct sem work_sem;
struct job_queue queue[JOB_QUEUE_COUNT];
int worker_count;
struct scheduler_worker worker[MAX_SCHEDULER_WORKER];
volatile u32 worker_running;
volatile u32 worker_active;
struct scheduler_profiling profiler;
int fiber_count;
struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS];
volatile u32 wait_list_lock;
struct scheduler_fiber *wait_list;
volatile u32 free_list_lock;
struct scheduler_fiber *free_list;
};
static struct job
Job(job_callback callback, void *data)
{
struct job task;
task.callback = callback;
task.data = data;
task.run_count = 0;
return task;
}
static void
scheduler_hook_into_list(struct scheduler_fiber **list,
struct scheduler_fiber *element, volatile u32 *lock)
{
spinlock_begin(lock);
if (!*list) {
*list = element;
element->prev = 0;
element->next = 0;
} else {
element->prev = 0;
element->next = *list;
(*list)->prev = element;
*list = element;
}
spinlock_end(lock);
}
static void
scheduler_unhook_from_list(struct scheduler_fiber **list,
struct scheduler_fiber *element, volatile u32 *lock)
{
spinlock_begin(lock);
if (element->next)
element->next->prev = element->prev;
if (element->prev)
element->prev->next = element->next;
if (*list == element)
*list = element->next;
element->next = element->prev = 0;
spinlock_end(lock);
}
static struct scheduler_fiber*
scheduler_find_fiber_finished_waiting(struct scheduler *s)
{
struct scheduler_fiber *iter = s->wait_list;
spinlock_begin(&s->wait_list_lock);
while (iter) {
if (*iter->job.run_count == iter->value) break;
iter = iter->next;
}
spinlock_end(&s->wait_list_lock);
return iter;
}
static struct scheduler_fiber*
scheduler_get_free_fiber(struct scheduler *s)
{
struct scheduler_fiber *fib = 0;
spinlock_begin(&s->free_list_lock);
if (s->fiber_count < MAX_SCHEDULER_FIBERS) {
fib = &s->fibers[s->fiber_count++];
} else if (s->free_list) {
fib = s->free_list;
}
spinlock_end(&s->free_list_lock);
return fib;
}
static void
scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs,
u32 count, job_counter *counter)
{
u32 jobIndex = 0;
struct job_queue *queue;
assert(q < JOB_QUEUE_COUNT);
assert(counter);
assert(jobs);
assert(s);
queue = &s->queue[q];
while (jobIndex < count) {
jobs[jobIndex].run_count = counter;
if (job_queue_push(queue, &jobs[jobIndex])) {
sem_post(&s->work_sem, 1);
jobIndex++;
}
}
*counter = count;
}
static void
fiber_proc(void *arg)
{
struct scheduler_worker *w = (struct scheduler_worker*)arg;
struct scheduler *s = w->sched;
__sync_add_and_fetch(&s->worker_active, 1);
while (1) {
/* check if any fiber is done waiting */
struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s);
if (fiber) {
/* put old worker context into freelist */
struct scheduler_fiber *old = w->context;
memset(w->context, 0, sizeof(*w->context));
scheduler_unhook_from_list(&s->wait_list, fiber, &s->wait_list_lock);
scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock);
/* set previously waiting fiber as worker context */
w->context = fiber;
if (s->profiler.context_switch)
s->profiler.context_switch(s->profiler.userdata, w->id);
fiber_switch_to(&old->handle, &w->context->handle);
}
/* check if any new jobs inside work queues */
{struct job *job = 0;
if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) &&
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) &&
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) {
/* currently no job so wait */
__sync_sub_and_fetch(&s->worker_active, 1);
if (s->profiler.wait_start)
s->profiler.wait_start(s->profiler.userdata, w->id);
sem_wait(&s->work_sem, 1);
if (s->profiler.wait_stop)
s->profiler.wait_stop(s->profiler.userdata, w->id);
__sync_add_and_fetch(&s->worker_active, 1);
} else {
/* run dequeued job */
w->context->job = *job;
assert(job->callback);
if (s->profiler.thread_start)
s->profiler.thread_start(s->profiler.userdata, w->id);
job->callback(s, job->data);
if (s->profiler.thread_stop)
s->profiler.thread_stop(s->profiler.userdata, w->id);
__sync_sub_and_fetch(job->run_count, 1);
}}
}
}
static void*
thread_proc(void *arg)
{
struct scheduler_worker *w = (struct scheduler_worker*)arg;
struct scheduler_fiber *fiber;
struct scheduler *s = w->sched;
__sync_add_and_fetch(&s->worker_running, 1);
/* create dummy fiber */
fiber = scheduler_get_free_fiber(s);
assert(fiber);
getcontext(&fiber->handle.fib);
fiber->handle.fib.uc_link = 0;
fiber->handle.fib.uc_stack.ss_size = 0;
fiber->handle.fib.uc_stack.ss_sp = 0;
w->context = fiber;
fiber_proc(w);
return 0;
}
static void
scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
{
struct scheduler_worker *w;
struct scheduler_fiber *old;
assert(s);
assert(counter);
/* find threads own worker state */
{int worker_index = 0;
pthread_t self = pthread_self();
for (worker_index; worker_index < s->worker_count; ++worker_index) {
if (s->worker[worker_index].thread == self)
w = &s->worker[worker_index];
}}
assert(w);
/* insert current context into waiting list */
old = w->context;
w->context->value = value;
w->context->job.run_count = counter;
scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock);
/*either continue finished waiting job or start new one */
w->context = scheduler_find_fiber_finished_waiting(s);
if (!w->context) {
w->context = scheduler_get_free_fiber(s);
assert(w->context);
scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock);
fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w);
} else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock);
if (s->profiler.context_switch)
s->profiler.context_switch(s->profiler.userdata, w->id);
fiber_switch_to(&old->handle, &w->context->handle);
}
static void
sched_init(struct scheduler *sched, size_t worker_count)
{
size_t thread_index = 0;
pthread_attr_t attr;
assert(sched);
assert(worker_count);
memset(sched, 0, sizeof(*sched));
sched->worker_count = (int)worker_count;
/* init semeaphores */
sem_init(&sched->work_sem, 0);
job_queue_init(&sched->queue[0]);
job_queue_init(&sched->queue[1]);
job_queue_init(&sched->queue[2]);
/* init fibers */
{int fiber_index = 0;
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
struct scheduler_fiber *fiber = sched->fibers + fiber_index;
fiber->stack_size = FIBER_STACK_SIZE;
fiber->stack = calloc(fiber->stack_size, 1);
}}
/* start worker threads */
pthread_attr_init(&attr);
for (thread_index; thread_index < worker_count-1; ++thread_index) {
cpu_set_t cpus;
sched->worker[thread_index].id = (int)thread_index;
sched->worker[thread_index].sched = sched;
/* bind thread to core */
CPU_ZERO(&cpus);
CPU_SET(thread_index, &cpus);
pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus);
/* start worker thread */
pthread_create(&sched->worker[thread_index].thread, &attr, thread_proc, &sched->worker[thread_index]);
pthread_detach(sched->worker[thread_index].thread);
}
/* initialize main thread as worker thread */
{cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(thread_index, &cpus);
sched->worker[thread_index].sched = sched;
sched->worker[thread_index].thread = pthread_self();
sched->worker[thread_index].id = (int)thread_index;
pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);}
/* create fiber for main thread worker */
{struct scheduler_fiber *fiber;
fiber = scheduler_get_free_fiber(sched);
assert(fiber);
getcontext(&fiber->handle.fib);
fiber->handle.fib.uc_link = 0;
fiber->handle.fib.uc_stack.ss_size = 0;
fiber->handle.fib.uc_stack.ss_sp = 0;
sched->worker[thread_index].context = fiber;}
pthread_attr_destroy(&attr);
}
static void
sched_free(struct scheduler *sched)
{
/* free fibers stack */
{int fiber_index = 0;
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
struct scheduler_fiber *fiber = sched->fibers + fiber_index;
free(fiber->stack);
}}
sem_free(&sched->work_sem);
}
/* --------------------------------------------------------------
*
* TEST
*
* --------------------------------------------------------------*/
struct game_state {
int data;
};
struct test_data {
int *data;
int from;
int to;
};
JOB_ENTRY_POINT(test_work)
{
int i;
struct test_data *data = arg;
for (i = data->from; i < data->to; ++i)
data->data[i] = i;
sleep(1);
}
JOB_ENTRY_POINT(root)
{
struct game_state *game = arg;
job_counter counter = 0;
struct job jobs[8];
struct test_data data[8];
int i, n[2*1024];
printf("root\n");
for (i = 0; i < 8; ++i) {
data[i].data = n;
data[i].from = i * 256;
data[i].to = (i+1)*256;
jobs[i] = Job(test_work, &data);
}
scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter);
printf("run\n");
scheduler_wait_for(sched, &counter, 0);
printf("done\n");
}
int main(int argc, char **argv)
{
struct game_state app;
struct job job = Job(root, &app);
job_counter counter;
/* start root process */
struct scheduler sched;
size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
sched_init(&sched, thread_count);
printf("init\n");
scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter);
printf("run\n");
scheduler_wait_for(&sched, &counter, 0);
printf("finished\n");
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment