Skip to content

Instantly share code, notes, and snippets.

@odecaux
Last active June 12, 2023 18:22
Show Gist options
  • Save odecaux/6b540a1812870eb1aeb6de96f636e438 to your computer and use it in GitHub Desktop.
Save odecaux/6b540a1812870eb1aeb6de96f636e438 to your computer and use it in GitHub Desktop.
Single writer RCU
struct rcu_t;
struct rcu_reader_t {
rcu_reader_t *previous;
rcu_reader_t *next;
rcu_t *obj;
c89atomic_uint64 min_epoch;
};
struct rcu_zombie_t {
void *value;
c89atomic_uint64 epoch_when_retired;
};
struct rcu_t {
void *value;
rcu_reader_t *readers_front;
rcu_zombie_t *zombies;
size_t zombie_count;
size_t zombie_cap;
c89atomic_uint64 current_epoch;
};
rcu_t rcu_create_default() {
rcu_t new_obj{};
new_obj.readers_front = nullptr;
new_obj.zombie_count = 0;
new_obj.zombie_cap = 1024;
new_obj.zombies = (rcu_zombie_t*)malloc(sizeof(rcu_zombie_t) * new_obj.zombie_cap);
new_obj.current_epoch = 1;
return new_obj;
};
void rcu_update(rcu_t *obj, void *new_value)
{
void *old_value = c89atomic_exchange_ptr(&obj->value, new_value);
assert(new_value != old_value); //user error, not our fault really
uint64_t old_epoch = c89atomic_fetch_add_64(&obj->current_epoch, 1);
//can old_value and old_epoch get out of sync ?
if (obj->zombie_count == obj->zombie_cap)
{
obj->zombie_cap *= 2;
obj->zombies = (rcu_zombie_t*)realloc(obj->zombies, sizeof(rcu_zombie_t) * obj->zombie_cap);
}
obj->zombies[obj->zombie_count++] = rcu_zombie_t { old_value, old_epoch };
}
size_t rcu_reclaim(rcu_t *obj, void **out_buffer, size_t out_buffer_size)
{
size_t reclaimed_object_count = 0;
for (size_t zombie_idx = 0; zombie_idx < obj->zombie_count; zombie_idx++)
{
rcu_zombie_t *zombie = &obj->zombies[zombie_idx];
if (zombie->value == nullptr)
break;
bool is_any_reader_using_epoch = false;
for(rcu_reader_t *reader = obj->readers_front; reader != nullptr; reader = reader->next)
{
uint64_t reader_epoch = c89atomic_load_64(&reader->min_epoch);
if (reader_epoch != 0 && reader_epoch <= zombie->epoch_when_retired)
{
is_any_reader_using_epoch = true;
break;
}
}
if (is_any_reader_using_epoch == false)
{
if (reclaimed_object_count >= out_buffer_size)
break;
out_buffer[reclaimed_object_count++] = zombie->value;
zombie->value = nullptr;
}
}
//erase all empty zombies from the array
{
int target_idx = 0;
for (int source_idx = 0; source_idx < obj->zombie_count; source_idx++)
if (obj->zombies[source_idx].value != nullptr)
obj->zombies[target_idx++] = obj->zombies[source_idx];
obj->zombie_count = target_idx;
}
return reclaimed_object_count;
}
void * rcu_destroy(rcu_t *obj)
{
assert(obj->readers_front == nullptr);
assert(obj->zombie_count == 0);
free(obj->zombies);
rcu_reader_t *next;
for (rcu_reader_t *reader = obj->readers_front; reader != nullptr; reader = next)
{
next = reader->next;
free(reader);
}
void *last_value = obj->value;
*obj = {};
return last_value;
}
rcu_reader_t *rcu_create_reader(rcu_t *obj)
{
rcu_reader_t *new_reader = (rcu_reader_t*) malloc(sizeof(rcu_reader_t));
new_reader->obj = obj;
new_reader->min_epoch = 0;
if (obj->readers_front != nullptr)
{
assert(obj->readers_front->previous == nullptr);
obj->readers_front->previous = new_reader;
}
new_reader->next = obj->readers_front;
new_reader->previous = nullptr;
obj->readers_front = new_reader;
return new_reader;
}
void rcu_destroy_reader(rcu_reader_t *reader)
{
rcu_t *obj = reader->obj;
if (obj->readers_front == reader)
{
assert(reader->previous == nullptr);
obj->readers_front = reader->next;
}
if (reader->previous)
reader->previous->next = reader->next;
if(reader->next)
reader->next->previous = reader->previous;
free(reader);
}
void *rcu_acquire_value(rcu_reader_t *reader)
{
//I need to convince myself this function does what I think it does
assert(reader->min_epoch == 0);
uint64_t current_epoch = c89atomic_load_64(&reader->obj->current_epoch);
c89atomic_store_64(&reader->min_epoch, current_epoch);
assert(reader->min_epoch > 0);
return c89atomic_load_ptr(&reader->obj->value);
}
void rcu_release_value(rcu_reader_t *reader)
{
assert(reader->min_epoch > 0);
c89atomic_store_64(&reader->min_epoch, 0);
}
struct test_t {
int val;
};
void rcu_test()
{
rcu_t obj = rcu_create_default();
assert(obj.value == nullptr);
assert(obj.readers_front == nullptr);
assert(obj.zombie_count == 0);
assert(obj.current_epoch > 0);
rcu_reader_t *reader_a = rcu_create_reader(&obj);
rcu_reader_t *reader_b = rcu_create_reader(&obj);
{
test_t *value = (test_t*)rcu_acquire_value(reader_a);
assert(value == nullptr);
rcu_release_value(reader_a);
}
test_t value_a = { 10 };
rcu_update(&obj, &value_a);
{
test_t *reclaim_value_buffer[32] = {};
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, 32);
assert(reclaimed_value_count == 0);
}
test_t value_b = { 11 };
rcu_update(&obj, &value_b);
{
test_t *reclaim_value_buffer[32] = {};
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, 32);
assert(reclaimed_value_count == 1);
assert(reclaim_value_buffer[0] == &value_a);
}
{
test_t *value = (test_t*)rcu_acquire_value(reader_a);
assert(value == &value_b);
rcu_release_value(reader_a);
}
test_t *value_b_ptr = (test_t*)rcu_acquire_value(reader_a);
assert(value_b_ptr == &value_b);
test_t value_c = { 12 };
rcu_update(&obj, &value_c);
test_t *value_c_ptr = (test_t*)rcu_acquire_value(reader_b);
assert(value_c_ptr == &value_c);
assert(value_b_ptr->val == 11);
assert(value_c_ptr->val == 12);
rcu_release_value(reader_a);
rcu_release_value(reader_b);
rcu_destroy_reader(reader_a);
rcu_destroy_reader(reader_b);
{
test_t *reclaim_value_buffer[32] = {};
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, 32);
assert(reclaimed_value_count == 1);
assert(reclaim_value_buffer[0] == &value_b);
assert(obj.value == &value_c);
}
#define UPDATE_COUNT 100000
rcu_reader_t **readers = (rcu_reader_t**)malloc(sizeof(rcu_reader_t*) * UPDATE_COUNT);
for (int i = 0; i < UPDATE_COUNT; i++)
{
readers[i] = rcu_create_reader(&obj);
}
test_t *values = (test_t*)malloc(sizeof(test_t) * UPDATE_COUNT);
std::thread main_thread { [&] {
for (int i = 0; i < UPDATE_COUNT; i++)
{
rcu_update(&obj, &values[i]);
}
} };
std::thread audio_thread { [&] {
for (int i = 0; i < UPDATE_COUNT; i++)
{
rcu_acquire_value(readers[i]);
}
for (int i = 0; i < UPDATE_COUNT; i++)
{
rcu_release_value(readers[i]);
}
} };
main_thread.join();
audio_thread.join();
for (int i = 0; i < UPDATE_COUNT; i++)
{
rcu_destroy_reader(readers[i]);
}
{
test_t *reclaim_value_buffer[UPDATE_COUNT] = {};
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, UPDATE_COUNT);
printf("%llu\n", reclaimed_value_count);
//assert(reclaimed_value_count == UPDATE_COUNT);
}
test_t *last_value = (test_t*)rcu_destroy(&obj);
free(readers);
free(values);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment