Skip to content

Instantly share code, notes, and snippets.

@dariomanesku
Forked from vurtun/fibers.c
Created October 10, 2017 15:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dariomanesku/e0d803c79f64c123c48a849b0a71bcf3 to your computer and use it in GitHub Desktop.
Save dariomanesku/e0d803c79f64c123c48a849b0a71bcf3 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <string.h>
#define streq(a, b) (!strcmp((a), (b)))
#ifndef __USE_GNU
#define __USE_GNU
#endif
#include <setjmp.h>
#include <sched.h>
#include <unistd.h>
#include <pthread.h>
#include "xmmintrin.h"
#include <ucontext.h>
#include <sys/mman.h>
typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;
typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;
#define MAX_WORK_QUEUE_THREAD 64
#define FIBER_STACK_SIZE (64 * 1024)
#define MAX_SCHEDULER_WORKER 64
#define MAX_SCHEDULER_FIBERS 128
/* --------------------------------------------------------------
*
* BITSET
*
* --------------------------------------------------------------*/
#define BITS_PER_BYTE 8
#define BITS_PER_LONG 64
#define BIT_WORD(nr) ((nr) / BITS_PER_LONG)
#define DIV_ROUND_UP(n,d) (((n) + (d) - 1) / (d))
#define BITS_TO_LONGS(nr) DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(u64))
#define BITMAP_FIRST_WORD_MASK(start) (~0UL << ((start) % (BITS_PER_LONG-1)))
#define BITMAP_LAST_WORD_MASK(nbits) (((nbits) % BITS_PER_LONG) ? (1UL << ((nbits) % BITS_PER_LONG))-1: ~0UL)
#define DECLARE_BITMAP(name, bits) u64 name[BITS_TO_LONGS(bits)]
#define min(a,b) (((a) < (b)) ? (a): (b))
#define max(a,b) (((a) > (b)) ? (a): (b))
static u64
bit_find_first_set(const u64 *addr, u64 size)
{
u64 idx;
#define ffs(x) (u64)__builtin_ffsl((long int)(x))
for (idx = 0; idx * BITS_PER_LONG < size; idx++) {
if (addr[idx]) {
u64 first_set = ffs(addr[idx]) - 1;
return min(idx * BITS_PER_LONG + first_set, size);
}
}
return size;
}
static void
bit_fill(u64 *dst, unsigned int nbits)
{
unsigned int nlongs = (unsigned int)BITS_TO_LONGS(nbits);
unsigned int len = (unsigned int)((nlongs - 1) * sizeof(u64));
memset(dst, 0xff, len);
}
static void
bit_or(u64 *dst, const u64 *bitmap1, const u64 *bitmap2,
unsigned int nbits)
{
unsigned int k;
unsigned int nr = (unsigned int)BITS_TO_LONGS(nbits);
for (k = 0; k < nr; ++k)
dst[k] = bitmap1[k] | bitmap2[k];
}
static void
bit_set(u64 *bitset, unsigned int start, int len)
{
u64 *p = bitset + BIT_WORD(start);
const unsigned int size = start + (unsigned int)len;
int bits_to_set = (int)(BITS_PER_LONG - (start % BITS_PER_LONG));
u64 mask_to_set = BITMAP_FIRST_WORD_MASK(start);
while (len - bits_to_set >= 0) {
*p |= mask_to_set;
len -= bits_to_set;
bits_to_set = BITS_PER_LONG;
mask_to_set = ~0UL;
p++;
}
if (len) {
mask_to_set &= BITMAP_LAST_WORD_MASK(size);
*p |= mask_to_set;
}
}
static void
bit_clear(u64 *bitset, unsigned int start, int len)
{
u64 *p = bitset + BIT_WORD(start);
const unsigned int size = start + (unsigned int)len;
int bits_to_clear = (int)(BITS_PER_LONG - (start % BITS_PER_LONG));
unsigned int long mask_to_clear = BITMAP_FIRST_WORD_MASK(start);
while (len - bits_to_clear >= 0) {
*p &= ~mask_to_clear;
len -= bits_to_clear;
bits_to_clear = BITS_PER_LONG;
mask_to_clear = ~0UL;
p++;
}
if (len) {
mask_to_clear &= BITMAP_LAST_WORD_MASK(size);
*p &= ~mask_to_clear;
}
}
/* --------------------------------------------------------------
*
* SPINLOCK
*
* --------------------------------------------------------------*/
static void
spinlock_begin(volatile u32 *spinlock)
{while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0) _mm_pause();}
static void
spinlock_end(volatile u32 *spinlock)
{_mm_sfence(); *spinlock = 0;}
/* --------------------------------------------------------------
*
* MEMORY
*
* --------------------------------------------------------------*/
#define MEMORY_BLOCK_SIZE (2*1024*1024) /* 2MB page size */
#define MAX_MEMORY_BLOCKS 512 /* 1GB memory */
typedef DECLARE_BITMAP(memory_bitmap, MAX_MEMORY_BLOCKS);
enum memory_tag {
MEMORY_TAG_GAME,
MEMORY_TAG_RENDER,
MEMORY_TAG_GPU,
MEMORY_TAG_COUNT
};
struct memory {
volatile u32 lock;
memory_bitmap free;
memory_bitmap tags[MEMORY_TAG_COUNT];
void *data;
size_t size;
};
static void
mem_init(struct memory *mem)
{
int i;
memset(mem, 0, sizeof(*mem));
mem->size = (size_t)MEMORY_BLOCK_SIZE * (size_t)MAX_MEMORY_BLOCKS;
mem->data = mmap(0, mem->size, PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_HUGETLB|MAP_ANONYMOUS, -1, 0);
if (mem->data == MAP_FAILED)
fprintf(stderr, "%s\n", strerror(errno));
assert(mem->data != MAP_FAILED);
bit_fill(mem->free, MAX_MEMORY_BLOCKS);
}
static void
mem_clear(struct memory *mem)
{
munmap(mem->data, mem->size);
memset(mem, 0, sizeof(*mem));
}
static void
mem_free(struct memory *mem, enum memory_tag tag)
{
spinlock_begin(&mem->lock);
bit_or(mem->free, mem->free, mem->tags[tag], MAX_MEMORY_BLOCKS);
memset(mem->tags[tag], 0, sizeof(mem->tags[tag]));
spinlock_end(&mem->lock);
}
static void*
alloc_block(struct memory *mem, enum memory_tag tag)
{
unsigned int index;
spinlock_begin(&mem->lock);
index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS);
assert(index < MAX_MEMORY_BLOCKS);
{char *block;
bit_clear(mem->free, index, 1);
bit_set(mem->tags[tag], index, 1);
block = (char*)mem->data + (MEMORY_BLOCK_SIZE * index);
spinlock_end(&mem->lock);
return block;}
}
/* --------------------------------------------------------------
*
* ALLOCATOR
*
* --------------------------------------------------------------*/
struct block {
void *data;
size_t size;
size_t capacity;
};
struct allocator {
enum memory_tag tag;
struct memory *memory;
struct block blocks[MAX_SCHEDULER_WORKER];
size_t worker_count;
};
static void
alloctor_init(struct allocator *a, size_t worker_count,
struct memory *memory, enum memory_tag tag)
{
unsigned int i = 0;
memset(a, 0, sizeof(*a));
a->tag = tag;
a->memory = memory;
a->worker_count = worker_count;
for(;i < a->worker_count; ++i) {
a->blocks[i].data = alloc_block(a->memory, tag);
a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
a->blocks[i].size = 0;
}
}
static void
allocator_clear(struct allocator *a)
{
unsigned int i = 0;
for(;i < a->worker_count; ++i) {
a->blocks[i].size = 0;
a->blocks[i].data = alloc_block(a->memory, a->tag);
a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
}
}
static void*
alloc(struct allocator *a, int thread_index, size_t size)
{
void *memory = 0;
assert(size < MEMORY_BLOCK_SIZE);
if ((a->blocks[thread_index].size + size) > a->blocks[thread_index].capacity) {
/* allocate new worker memory block */
a->blocks[thread_index].data = alloc_block(a->memory, a->tag);
assert(a->blocks[thread_index].data);
a->blocks[thread_index].size = 0;
}
/* allocate from worker memory block */
memory = (char*)a->blocks[thread_index].data + a->blocks[thread_index].size;
a->blocks[thread_index].size += size;
return memory;
}
/* --------------------------------------------------------------
*
* SEMAPHORE
*
* --------------------------------------------------------------*/
struct sem {
pthread_mutex_t guard;
pthread_cond_t cond;
int count;
};
static void
sem_init(struct sem *s, int init)
{
pthread_mutex_init(&s->guard, 0);
pthread_cond_init(&s->cond, 0);
s->count = init;
}
static void
sem_free(struct sem *s)
{
pthread_mutex_destroy(&s->guard);
pthread_cond_destroy(&s->cond);
}
static void
sem_post(struct sem *s, int delta)
{
if (delta < 0) return;
pthread_mutex_lock(&s->guard);
s->count += delta;
pthread_cond_broadcast(&s->cond);
pthread_mutex_unlock(&s->guard);
}
static void
sem_wait(struct sem *s, int delta)
{
if (delta < 0) return;
pthread_mutex_lock(&s->guard);
do {
if (s->count >= delta) {
s->count -= delta;
break;
}
pthread_cond_wait(&s->cond, &s->guard);
} while (1);
pthread_mutex_unlock(&s->guard);
}
/* --------------------------------------------------------------
*
* FIBER
*
* --------------------------------------------------------------*/
#undef _FORTIFY_SOURCE
typedef void(*fiber_callback)(void*);
struct sys_fiber {
ucontext_t fib;
};
static void
fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size,
fiber_callback callback, void *data)
{
getcontext(&fib->fib);
fib->fib.uc_stack.ss_size = stack_size;
fib->fib.uc_stack.ss_sp = stack;
fib->fib.uc_link = 0;
makecontext(&fib->fib, (void(*)())callback, 1, data);
}
static void
fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib)
{
swapcontext(&prev->fib, &fib->fib);
}
/* --------------------------------------------------------------
*
* QUEUE
*
* --------------------------------------------------------------*/
/* This is an implementation of a multi producer and consumer Non-Blocking
* Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs:
* www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */
#define MAX_WORK_QUEUE_JOBS (1024)
#define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1)
struct scheduler;
typedef void(*job_callback)(struct scheduler*,void*);
#define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg)
#define BASE_ALIGN(x) __attribute__((aligned(x)))
#define QUEUE_EMPTY 0
#define QUEUE_REMOVED 1
struct job {
void *data;
job_callback callback;
volatile u32 *run_count;
};
struct job_queue {
volatile u32 head;
struct job *jobs[MAX_WORK_QUEUE_JOBS];
volatile u32 tail;
};
static void
job_queue_init(struct job_queue *q)
{
memset(q, 0, sizeof(*q));
q->jobs[0] = QUEUE_EMPTY;
q->head = 0;
q->tail = 1;
}
static int
job_queue_entry_free(struct job *p)
{
return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED));
}
static int
job_queue_push(struct job_queue *q, struct job *job)
{
while (1) {
/* read tail */
u32 te = q->tail;
u32 ate = te;
struct job *tt = q->jobs[ate];
u32 tmp = (ate + 1) & WORK_QUEUE_MASK;
struct job *tnew;
/* we want to find the actual tail */
while (!(job_queue_entry_free(tt))) {
/* check tails consistency */
if (te != q->tail) goto retry;
/* check if queue is full */
if (tmp == q->head) break;
tt = q->jobs[tmp];
ate = tmp;
tmp = (ate + 1) & WORK_QUEUE_MASK;
}
/* check tails consistency */
if (te != q->tail) continue;
/* check if queue is full */
if (tmp == q->head) {
ate = (tmp + 1) & WORK_QUEUE_MASK;
tt = q->jobs[ate];
if (!(job_queue_entry_free(tt)))
return 0; /* queue is full */
/* let pop update header */
__sync_bool_compare_and_swap(&q->head, tmp, ate);
continue;
}
if ((uintptr_t)tt == QUEUE_REMOVED)
job = (struct job*)((uintptr_t)job | 0x01);
if (te != q->tail) continue;
if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) {
if ((tmp & 1) == 0)
__sync_bool_compare_and_swap(&q->tail, te, tmp);
return 1;
}
retry:;
}
}
static int
job_queue_pop(struct job **job, struct job_queue *q)
{
while (1) {
u32 th = q->head;
u32 tmp = (th + 1) & WORK_QUEUE_MASK;
struct job *tt = q->jobs[tmp];
struct job *tnull = 0;
/* we want to find the actual head */
while ((job_queue_entry_free(tt))) {
if (th != q->head) goto retry;
if (tmp == q->tail) return 0;
tmp = (tmp + 1) & WORK_QUEUE_MASK;
tt = q->jobs[tmp];
}
/* check head's consistency */
if (th != q->head) continue;
/* check if queue is empty */
if (tmp == q->tail) {
/* help push to update end */
__sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK);
continue; /* retry */
}
tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY);
if (th != q->head) continue;
/* get actual head */
if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) {
if ((tmp & 0x1) == 0)
__sync_bool_compare_and_swap(&q->head, th, tmp);
*job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1);
return 1;
}
retry:;
}
}
/* --------------------------------------------------------------
*
* SCHEDULER
*
* --------------------------------------------------------------*/
typedef volatile u32 job_counter;
enum job_queue_ids {
JOB_QUEUE_LOW,
JOB_QUEUE_NORMAL,
JOB_QUEUE_HIGH,
JOB_QUEUE_COUNT
};
struct scheduler_fiber {
struct scheduler_fiber *next;
struct scheduler_fiber *prev;
void *stack;
size_t stack_size;
struct sys_fiber handle;
struct job job;
u32 value;
};
struct scheduler_worker {
int id;
pthread_t thread;
struct scheduler_fiber *context;
struct scheduler *sched;
};
typedef void (*scheduler_profiler_callback_f)(void*, int thread_id);
struct scheduler_profiling {
void *userdata;
scheduler_profiler_callback_f thread_start;
scheduler_profiler_callback_f thread_stop;
scheduler_profiler_callback_f context_switch;
scheduler_profiler_callback_f wait_start;
scheduler_profiler_callback_f wait_stop;
};
struct scheduler {
struct sem work_sem;
struct job_queue queue[JOB_QUEUE_COUNT];
int worker_count;
struct scheduler_worker worker[MAX_SCHEDULER_WORKER];
volatile u32 worker_running;
volatile u32 worker_active;
struct scheduler_profiling profiler;
int fiber_count;
struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS];
volatile u32 wait_list_lock;
struct scheduler_fiber *wait_list;
volatile u32 free_list_lock;
struct scheduler_fiber *free_list;
};
static struct job
Job(job_callback callback, void *data)
{
struct job task;
task.callback = callback;
task.data = data;
task.run_count = 0;
return task;
}
static void
scheduler_hook_into_list(struct scheduler_fiber **list,
struct scheduler_fiber *element, volatile u32 *lock)
{
spinlock_begin(lock);
if (!*list) {
*list = element;
element->prev = 0;
element->next = 0;
} else {
element->prev = 0;
element->next = *list;
(*list)->prev = element;
*list = element;
}
spinlock_end(lock);
}
static void
scheduler_unhook_from_list(struct scheduler_fiber **list,
struct scheduler_fiber *element, volatile u32 *lock)
{
if (lock) spinlock_begin(lock);
if (element->next)
element->next->prev = element->prev;
if (element->prev)
element->prev->next = element->next;
if (*list == element)
*list = element->next;
element->next = element->prev = 0;
if (lock) spinlock_end(lock);
}
static struct scheduler_fiber*
scheduler_find_fiber_finished_waiting(struct scheduler *s)
{
struct scheduler_fiber *iter = s->wait_list;
spinlock_begin(&s->wait_list_lock);
while (iter) {
if (*iter->job.run_count == iter->value) break;
iter = iter->next;
}
if (iter) scheduler_unhook_from_list(&s->wait_list, iter, 0);
spinlock_end(&s->wait_list_lock);
return iter;
}
static struct scheduler_fiber*
scheduler_get_free_fiber(struct scheduler *s)
{
struct scheduler_fiber *fib = 0;
spinlock_begin(&s->free_list_lock);
if (s->fiber_count < MAX_SCHEDULER_FIBERS) {
fib = &s->fibers[s->fiber_count++];
} else if (s->free_list) {
fib = s->free_list;
scheduler_unhook_from_list(&s->free_list, fib, 0);
}
spinlock_end(&s->free_list_lock);
return fib;
}
static void
scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs,
u32 count, job_counter *counter)
{
u32 jobIndex = 0;
struct job_queue *queue;
assert(q < JOB_QUEUE_COUNT);
assert(counter);
assert(jobs);
assert(s);
queue = &s->queue[q];
while (jobIndex < count) {
jobs[jobIndex].run_count = counter;
if (job_queue_push(queue, &jobs[jobIndex])) {
sem_post(&s->work_sem, 1);
jobIndex++;
}
}
*counter = count;
}
static void
scheduler_fiber_proc(void *arg)
{
struct scheduler_worker *w = (struct scheduler_worker*)arg;
struct scheduler *s = w->sched;
__sync_add_and_fetch(&s->worker_active, 1);
while (1) {
/* check if any fiber is done waiting */
struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s);
if (fiber) {
/* put old worker context into freelist */
struct scheduler_fiber *old = w->context;
memset(w->context, 0, sizeof(*w->context));
scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock);
/* set previously waiting fiber as worker context */
w->context = fiber;
if (s->profiler.context_switch)
s->profiler.context_switch(s->profiler.userdata, w->id);
fiber_switch_to(&old->handle, &w->context->handle);
}
/* check if any new jobs inside work queues */
{struct job *job = 0;
if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) &&
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) &&
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) {
/* currently no job so wait */
__sync_sub_and_fetch(&s->worker_active, 1);
if (s->profiler.wait_start)
s->profiler.wait_start(s->profiler.userdata, w->id);
sem_wait(&s->work_sem, 1);
if (s->profiler.wait_stop)
s->profiler.wait_stop(s->profiler.userdata, w->id);
__sync_add_and_fetch(&s->worker_active, 1);
} else {
/* run dequeued job */
w->context->job = *job;
assert(job->callback);
if (s->profiler.thread_start)
s->profiler.thread_start(s->profiler.userdata, w->id);
job->callback(s, job->data);
if (s->profiler.thread_stop)
s->profiler.thread_stop(s->profiler.userdata, w->id);
__sync_sub_and_fetch(job->run_count, 1);
}}
}
}
static void*
thread_proc(void *arg)
{
struct scheduler_worker *w = (struct scheduler_worker*)arg;
struct scheduler_fiber *fiber;
struct scheduler *s = w->sched;
__sync_add_and_fetch(&s->worker_running, 1);
/* create dummy fiber */
fiber = scheduler_get_free_fiber(s);
assert(fiber);
getcontext(&fiber->handle.fib);
fiber->handle.fib.uc_link = 0;
fiber->handle.fib.uc_stack.ss_size = 0;
fiber->handle.fib.uc_stack.ss_sp = 0;
w->context = fiber;
scheduler_fiber_proc(w);
return 0;
}
static int
scheduler_self_id(struct scheduler *s)
{
int worker_index = 0;
pthread_t self = pthread_self();
for (worker_index; worker_index < s->worker_count; ++worker_index) {
if (s->worker[worker_index].thread == self)
return worker_index;
}
return -1;
}
static struct scheduler_worker*
scheduler_self(struct scheduler *s)
{
int worker_index = scheduler_self_id(s);
if (worker_index < 0) return 0;
return &s->worker[worker_index];
}
static void
scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
{
struct scheduler_worker *w;
struct scheduler_fiber *old;
assert(s);
assert(counter);
/* find threads own worker state */
w = scheduler_self(s);
assert(w);
/* insert current context into waiting list */
old = w->context;
w->context->value = value;
w->context->job.run_count = counter;
scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock);
/*either continue finished waiting job or start new one */
w->context = scheduler_find_fiber_finished_waiting(s);
if (!w->context) {
w->context = scheduler_get_free_fiber(s);
assert(w->context);
fiber_create(&w->context->handle, w->context->stack,
w->context->stack_size, scheduler_fiber_proc, w);
}
if (s->profiler.context_switch)
s->profiler.context_switch(s->profiler.userdata, w->id);
fiber_switch_to(&old->handle, &w->context->handle);
}
static void
sched_init(struct scheduler *sched, size_t worker_count)
{
size_t thread_index = 0;
pthread_attr_t attr;
assert(sched);
assert(worker_count);
memset(sched, 0, sizeof(*sched));
sched->worker_count = (int)worker_count;
/* init semeaphores */
sem_init(&sched->work_sem, 0);
job_queue_init(&sched->queue[0]);
job_queue_init(&sched->queue[1]);
job_queue_init(&sched->queue[2]);
/* init fibers */
{int fiber_index = 0;
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
struct scheduler_fiber *fiber = sched->fibers + fiber_index;
fiber->stack_size = FIBER_STACK_SIZE;
fiber->stack = calloc(fiber->stack_size, 1);
}}
/* start worker threads */
pthread_attr_init(&attr);
for (thread_index; thread_index < worker_count-1; ++thread_index) {
cpu_set_t cpus;
sched->worker[thread_index].id = (int)thread_index;
sched->worker[thread_index].sched = sched;
/* bind thread to core */
CPU_ZERO(&cpus);
CPU_SET(thread_index, &cpus);
pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus);
/* start worker thread */
pthread_create(&sched->worker[thread_index].thread, &attr,
thread_proc, &sched->worker[thread_index]);
pthread_detach(sched->worker[thread_index].thread);
}
/* initialize main thread as worker thread */
{cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(thread_index, &cpus);
sched->worker[thread_index].sched = sched;
sched->worker[thread_index].thread = pthread_self();
sched->worker[thread_index].id = (int)thread_index;
pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);}
/* create fiber for main thread worker */
{struct scheduler_fiber *fiber;
fiber = scheduler_get_free_fiber(sched);
assert(fiber);
getcontext(&fiber->handle.fib);
fiber->handle.fib.uc_link = 0;
fiber->handle.fib.uc_stack.ss_size = 0;
fiber->handle.fib.uc_stack.ss_sp = 0;
sched->worker[thread_index].context = fiber;}
pthread_attr_destroy(&attr);
}
static void
sched_free(struct scheduler *sched)
{
/* free fibers stack */
{int fiber_index = 0;
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
struct scheduler_fiber *fiber = sched->fibers + fiber_index;
free(fiber->stack);
}}
sem_free(&sched->work_sem);
}
/* --------------------------------------------------------------
*
* TEST
*
* --------------------------------------------------------------*/
struct game_state {
int data;
struct memory memory;
struct allocator game_alloc;
struct allocator gpu_alloc;
struct allocator render_alloc;
};
struct test_data {
struct allocator *alloc;
int *data;
int from;
int to;
};
JOB_ENTRY_POINT(test_work)
{
int i;
void *mem;
struct test_data *data = arg;
mem = alloc(data->alloc, scheduler_self_id(sched), 1024);
for (i = data->from; i < data->to; ++i)
data->data[i] = i;
printf("sleep begin\n");
sleep(1);
printf("sleep end\n");
}
JOB_ENTRY_POINT(root)
{
struct game_state *game = arg;
job_counter counter = 0;
struct job jobs[8];
struct test_data data[8];
int i, n[2*1024];
printf("root\n");
for (i = 0; i < 8; ++i) {
data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc;
data[i].data = n;
data[i].from = i * 256;
data[i].to = (i+1)*256;
jobs[i] = Job(test_work, &data);
}
scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter);
printf("run\n");
scheduler_wait_for(sched, &counter, 0);
mem_free(&game->memory, MEMORY_TAG_GAME);
mem_free(&game->memory, MEMORY_TAG_RENDER);
mem_free(&game->memory, MEMORY_TAG_GPU);
printf("done\n");
}
int main(int argc, char **argv)
{
/* setup app memory and allocator */
struct game_state app;
size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
memset(&app, 0, sizeof(app));
mem_init(&app.memory);
alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME);
alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER);
alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU);
/* start root process */
{struct scheduler sched;
struct job job = Job(root, &app);
job_counter counter;
sched_init(&sched, thread_count);
printf("init\n");
scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter);
printf("run\n");
scheduler_wait_for(&sched, &counter, 0);
printf("finished\n");}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment