Skip to content

Instantly share code, notes, and snippets.

@rikusalminen
Last active August 27, 2017 08:15
Show Gist options
  • Save rikusalminen/5693101 to your computer and use it in GitHub Desktop.
Save rikusalminen/5693101 to your computer and use it in GitHub Desktop.
Multi-core game loop. Triple buffer, periodically updated by N threads in parallel, readers-writers locking.
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <stdio.h>
#include <errno.h>
#ifdef __linux__
#define _GNU_SOURCE
#define __USE_GNU
#include <sched.h>
#endif
#include <pthread.h>
#include <unistd.h>
struct looper_worker;
struct looper
{
pthread_mutex_t lock;
pthread_cond_t state_changed;
pthread_cond_t can_read, can_write;
int stopped;
uint64_t interval;
uint64_t start_time;
uint64_t frame, start_frame, end_frame;
int num_waiting, num_started, num_finished;
int writers_waiting, writers;
int readers_waiting, readers;
uint64_t read_frame, write_frame;
int read_buffer, write_buffer, temp_buffer;
int src_buffer, dst_buffer;
int num_threads;
pthread_t *threads;
};
struct looper_worker
{
pthread_t thread;
struct looper *looper;
int worker_id;
};
int looper_start(struct looper *looper, uint64_t interval, uint64_t num_frames)
{
pthread_mutex_lock(&looper->lock);
if(!looper->stopped)
{
struct timespec timestamp;
clock_gettime(CLOCK_MONOTONIC, &timestamp);
looper->start_time = timestamp.tv_sec * 1000000000 + timestamp.tv_nsec;
looper->interval = interval;
looper->start_frame = looper->frame;
looper->end_frame = num_frames == 0 ? 0 : looper->frame + num_frames + 1;
pthread_cond_broadcast(&looper->state_changed);
}
pthread_mutex_unlock(&looper->lock);
return 0;
}
int looper_pause(struct looper *looper)
{
pthread_mutex_lock(&looper->lock);
if(!looper->stopped)
looper->end_frame = (looper->num_started != looper->num_finished) ? looper->frame + 1 : looper->frame;
pthread_mutex_unlock(&looper->lock);
return 0;
}
int looper_stop(struct looper *looper)
{
pthread_mutex_lock(&looper->lock);
looper->stopped = 1;
pthread_cond_broadcast(&looper->state_changed);
pthread_cond_broadcast(&looper->can_read);
pthread_cond_broadcast(&looper->can_write);
pthread_mutex_unlock(&looper->lock);
return 0;
}
int looper_read(struct looper *looper, uint64_t *frame, int *buffer)
{
pthread_mutex_lock(&looper->lock);
looper->readers_waiting += 1;
while(!looper->stopped && (looper->writers_waiting > 0 || looper->writers > 0))
pthread_cond_wait(&looper->can_read, &looper->lock);
looper->readers_waiting -= 1;
int result = 0;
if(!looper->stopped)
{
assert(looper->writers == 0 && looper->writers_waiting == 0);
if(looper->readers == 0 && looper->src_buffer != looper->read_buffer)
{
// grab latest frame
looper->temp_buffer = looper->read_buffer;
looper->read_buffer = looper->src_buffer;
looper->read_frame = looper->frame - 1;
}
looper->readers += 1;
*buffer = looper->read_buffer;
*frame = looper->read_frame;
result = 1;
}
pthread_mutex_unlock(&looper->lock);
return result;
}
int looper_read_end(struct looper *looper)
{
pthread_mutex_lock(&looper->lock);
looper->readers -= 1;
int result = 0;
if(!looper->stopped)
{
if(looper->readers == 0)
{
if(looper->writers == 0 && looper->read_buffer != looper->src_buffer)
{
looper->temp_buffer = looper->read_buffer;
looper->read_buffer = looper->src_buffer;
looper->read_frame = looper->frame - 1;
}
if(looper->writers_waiting && looper->num_started == 0)
pthread_cond_signal(&looper->can_write);
}
result = 1;
}
pthread_mutex_unlock(&looper->lock);
return result;
}
int looper_write(struct looper *looper, uint64_t *frame, int *buffer)
{
pthread_mutex_lock(&looper->lock);
looper->writers_waiting += 1;
while(!looper->stopped &&
(looper->num_started > 0 ||
(looper->readers > 0 && looper->read_buffer == looper->src_buffer) ||
looper->writers > 0
))
pthread_cond_wait(&looper->can_write, &looper->lock);
looper->writers_waiting -= 1;
int result = 0;
if(!looper->stopped)
{
assert(looper->writers == 0);
looper->writers += 1;
looper->write_buffer = looper->src_buffer;
looper->write_frame = looper->frame - 1;
pthread_cond_signal(&looper->state_changed);
*frame = looper->write_frame;
*buffer = looper->write_buffer;
assert(looper->writers == 1);
assert(looper->read_buffer != looper->write_buffer || looper->readers == 0);
assert(looper->num_started == 0);
result = 1;
}
pthread_mutex_unlock(&looper->lock);
return result;
}
int looper_write_end(struct looper *looper)
{
pthread_mutex_lock(&looper->lock);
looper->writers -= 1;
int result = 0;
if(!looper->stopped)
{
assert(looper->writers == 0);
if(looper->writers_waiting > 0)
pthread_cond_signal(&looper->can_write);
else if(looper->readers_waiting > 0)
pthread_cond_signal(&looper->can_read);
else
pthread_cond_signal(&looper->state_changed);
result = 1;
}
pthread_mutex_unlock(&looper->lock);
return result;
}
static int looper_wait(struct looper* looper, uint64_t *frame, int *src_buffer, int *dst_buffer)
{
pthread_mutex_lock(&looper->lock);
looper->num_waiting += 1;
if(looper->num_started == 0)
{
// this must be the first frame
assert(looper->frame == 1 && *frame == 0);
} else if(!looper->stopped && *frame != 0)
{
looper->num_finished += 1;
if(looper->num_finished == looper->num_threads)
{
assert(looper->num_finished == looper->num_started);
looper->frame += 1;
looper->num_started = 0;
looper->num_finished = 0;
if(looper->read_buffer == looper->src_buffer)
{
looper->src_buffer = looper->dst_buffer;
looper->dst_buffer = looper->temp_buffer;
} else
{
int swap_buffer = looper->src_buffer;
looper->src_buffer = looper->dst_buffer;
looper->dst_buffer = swap_buffer;
}
assert(looper->dst_buffer != looper->read_buffer || looper->readers == 0);
assert(looper->dst_buffer != looper->src_buffer);
if(looper->writers_waiting > 0 && looper->readers == 0)
pthread_cond_signal(&looper->can_write);
}
}
while(!looper->stopped)
{
if(*frame + 1 == looper->frame &&
(looper->end_frame == 0 || looper->end_frame > looper->frame + 1)
&& looper->writers == 0
// && looper->writers_waiting == 0
)
{
uint64_t next_frame_time =
looper->start_time +
looper->interval * (looper->frame - looper->start_frame);
struct timespec abstime = {
next_frame_time / 1000000000,
next_frame_time % 1000000000
};
if(looper->interval == 0 ||
pthread_cond_timedwait(&looper->state_changed, &looper->lock, &abstime) == ETIMEDOUT)
{
if(looper->writers > 0)
continue;
if(looper->num_started == 0)
pthread_cond_broadcast(&looper->state_changed);
break;
}
} else
{
pthread_cond_wait(&looper->state_changed, &looper->lock);
}
}
looper->num_waiting -= 1;
int result = 0;
if(!looper->stopped)
{
looper->num_started += 1;
*frame = looper->frame;
*src_buffer = looper->src_buffer;
*dst_buffer = looper->dst_buffer;
result = 1;
assert(looper->writers == 0);
assert(looper->dst_buffer != looper->read_buffer || looper->readers == 0);
assert(looper->dst_buffer != looper->src_buffer);
}
pthread_mutex_unlock(&looper->lock);
return result;
}
static void *looper_main(void *arg)
{
struct looper *looper = (struct looper*)arg;
printf("looper_main on core: %d\n", sched_getcpu());
uint64_t frame = 0, prev_frame = 0;
int src_buffer, dst_buffer;
while(looper_wait(looper, &frame, &src_buffer, &dst_buffer) == 1)
{
assert(frame == prev_frame + 1);
prev_frame = frame;
assert(src_buffer != dst_buffer);
printf("%d: %lu\t(%d -> %d)\n", sched_getcpu(), frame, src_buffer, dst_buffer);
struct timespec delay = { 0, 1000*1000*8 };
nanosleep(&delay, NULL);
}
return looper;
}
int looper_create(struct looper *looper, int num_cpus, int hyperthread)
{
looper->num_threads = hyperthread ? (num_cpus / 2) : num_cpus;
looper->threads = calloc(looper->num_threads, sizeof(pthread_t));
looper->frame = 1;
looper->end_frame = 1;
looper->read_buffer = 0;
looper->write_buffer = 1;
looper->temp_buffer = 2;
looper->src_buffer = 0;
looper->dst_buffer = 1;
pthread_mutex_init(&looper->lock, NULL);
pthread_condattr_t condattr;
pthread_condattr_init(&condattr);
pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC);
pthread_cond_init(&looper->state_changed, &condattr);
pthread_condattr_destroy(&condattr);
for(int i = 0; i < looper->num_threads; ++i)
{
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
#ifdef __linux__
//#if 0
uint32_t cpuset = 1 << (hyperthread ? i*2 : i);
if(pthread_attr_setaffinity_np(&thread_attr, sizeof(uint32_t), (cpu_set_t*)&cpuset) != 0)
{
printf("error setting thread attribute\n");
}
#endif
void *looper_arg = looper;
if(pthread_create(&looper->threads[i], &thread_attr, looper_main, looper_arg) != 0)
{
printf("error creating thread\n");
}
pthread_attr_destroy(&thread_attr);
}
return 0;
}
int looper_join(struct looper *looper)
{
for(int i = 0; i < looper->num_threads; ++i)
{
void *ret;
pthread_join(looper->threads[i], &ret);
}
free(looper->threads);
}
void *test_thread_main(void *arg)
{
struct looper *looper = (struct looper*)arg;
for(int i = 0 ; i < 100; ++i)
{
struct timespec delay = {
0,
(rand() % 100) * 1000 * 1000
};
nanosleep(&delay, NULL);
delay.tv_nsec /= 10;
uint64_t frame;
int buffer;
if(rand() < RAND_MAX/2)
{
if(looper_read(looper, &frame, &buffer) != 1)
break;
printf("read frame: %lu buffer: %d\n", frame, buffer);
//delay.tv_nsec = 1000 * 1000 * 8;
nanosleep(&delay, NULL);
if(looper_read_end(looper) != 1)
break;
printf("read frame: %lu buffer: %d end\n", frame, buffer);
} else
{
if(looper_write(looper, &frame, &buffer) != 1)
break;
printf("write frame: %lu buffer: %d\n", frame, buffer);
nanosleep(&delay, NULL);
if(looper_write_end(looper) != 1)
break;
printf("write frame: %lu buffer: %d end\n", frame, buffer);
}
}
}
int main()
{
srand(time(NULL));
int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
printf("%d cores\n", num_cores);
struct looper looper = { 0 };
looper_create(&looper, num_cores, 0);
struct timespec delay = { 0, 500 * 1000 * 1000 };
printf("start\n");
looper_start(&looper, 1000 * 1000 * 10, 0);
//looper_start(&looper, 0, 0);
nanosleep(&delay, NULL);
const int NUM_TEST_THREADS = 4;
pthread_t test_threads[NUM_TEST_THREADS];
for(int i = 0; i < NUM_TEST_THREADS; ++i)
pthread_create(&test_threads[i], NULL, test_thread_main, (void*)&looper);
#if 0
nanosleep(&delay, NULL);
for(int i = 0; i < 5; ++i)
{
printf("write\n");
uint64_t write_frame;
int write_buffer;
looper_write(&looper, &write_frame, &write_buffer);
printf("write frame: %lu buffer: %d\n", write_frame, write_buffer);
nanosleep(&delay, NULL);
printf("write end\n");
looper_write_end(&looper);
nanosleep(&delay, NULL);
}
nanosleep(&delay, NULL);
for(int i = 0; i < 5; ++i)
{
printf("read\n");
uint64_t read_frame;
int read_buffer;
looper_read(&looper, &read_frame, &read_buffer);
printf("read frame: %lu buffer: %d\n", read_frame, read_buffer);
nanosleep(&delay, NULL);
printf("read end\n");
looper_read_end(&looper);
//nanosleep(&delay, NULL);
}
nanosleep(&delay, NULL);
#endif
printf("pause\n");
looper_pause(&looper);
nanosleep(&delay, NULL);
printf("start\n");
looper_start(&looper, 1000 * 1000 * 10, 0);
nanosleep(&delay, NULL);
printf("pause\n");
looper_pause(&looper);
nanosleep(&delay, NULL);
printf("stop\n");
looper_stop(&looper);
for(int i = 0; i < NUM_TEST_THREADS; ++i)
pthread_join(test_threads[i], NULL);
looper_join(&looper);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment