Last active
June 12, 2023 18:22
-
-
Save odecaux/6b540a1812870eb1aeb6de96f636e438 to your computer and use it in GitHub Desktop.
Single writer RCU
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct rcu_t; | |
struct rcu_reader_t { | |
rcu_reader_t *previous; | |
rcu_reader_t *next; | |
rcu_t *obj; | |
c89atomic_uint64 min_epoch; | |
}; | |
struct rcu_zombie_t { | |
void *value; | |
c89atomic_uint64 epoch_when_retired; | |
}; | |
struct rcu_t { | |
void *value; | |
rcu_reader_t *readers_front; | |
rcu_zombie_t *zombies; | |
size_t zombie_count; | |
size_t zombie_cap; | |
c89atomic_uint64 current_epoch; | |
}; | |
rcu_t rcu_create_default() { | |
rcu_t new_obj{}; | |
new_obj.readers_front = nullptr; | |
new_obj.zombie_count = 0; | |
new_obj.zombie_cap = 1024; | |
new_obj.zombies = (rcu_zombie_t*)malloc(sizeof(rcu_zombie_t) * new_obj.zombie_cap); | |
new_obj.current_epoch = 1; | |
return new_obj; | |
}; | |
void rcu_update(rcu_t *obj, void *new_value) | |
{ | |
void *old_value = c89atomic_exchange_ptr(&obj->value, new_value); | |
assert(new_value != old_value); //user error, not our fault really | |
uint64_t old_epoch = c89atomic_fetch_add_64(&obj->current_epoch, 1); | |
//can old_value and old_epoch get out of sync ? | |
if (obj->zombie_count == obj->zombie_cap) | |
{ | |
obj->zombie_cap *= 2; | |
obj->zombies = (rcu_zombie_t*)realloc(obj->zombies, sizeof(rcu_zombie_t) * obj->zombie_cap); | |
} | |
obj->zombies[obj->zombie_count++] = rcu_zombie_t { old_value, old_epoch }; | |
} | |
size_t rcu_reclaim(rcu_t *obj, void **out_buffer, size_t out_buffer_size) | |
{ | |
size_t reclaimed_object_count = 0; | |
for (size_t zombie_idx = 0; zombie_idx < obj->zombie_count; zombie_idx++) | |
{ | |
rcu_zombie_t *zombie = &obj->zombies[zombie_idx]; | |
if (zombie->value == nullptr) | |
break; | |
bool is_any_reader_using_epoch = false; | |
for(rcu_reader_t *reader = obj->readers_front; reader != nullptr; reader = reader->next) | |
{ | |
uint64_t reader_epoch = c89atomic_load_64(&reader->min_epoch); | |
if (reader_epoch != 0 && reader_epoch <= zombie->epoch_when_retired) | |
{ | |
is_any_reader_using_epoch = true; | |
break; | |
} | |
} | |
if (is_any_reader_using_epoch == false) | |
{ | |
if (reclaimed_object_count >= out_buffer_size) | |
break; | |
out_buffer[reclaimed_object_count++] = zombie->value; | |
zombie->value = nullptr; | |
} | |
} | |
//erase all empty zombies from the array | |
{ | |
int target_idx = 0; | |
for (int source_idx = 0; source_idx < obj->zombie_count; source_idx++) | |
if (obj->zombies[source_idx].value != nullptr) | |
obj->zombies[target_idx++] = obj->zombies[source_idx]; | |
obj->zombie_count = target_idx; | |
} | |
return reclaimed_object_count; | |
} | |
void * rcu_destroy(rcu_t *obj) | |
{ | |
assert(obj->readers_front == nullptr); | |
assert(obj->zombie_count == 0); | |
free(obj->zombies); | |
rcu_reader_t *next; | |
for (rcu_reader_t *reader = obj->readers_front; reader != nullptr; reader = next) | |
{ | |
next = reader->next; | |
free(reader); | |
} | |
void *last_value = obj->value; | |
*obj = {}; | |
return last_value; | |
} | |
rcu_reader_t *rcu_create_reader(rcu_t *obj) | |
{ | |
rcu_reader_t *new_reader = (rcu_reader_t*) malloc(sizeof(rcu_reader_t)); | |
new_reader->obj = obj; | |
new_reader->min_epoch = 0; | |
if (obj->readers_front != nullptr) | |
{ | |
assert(obj->readers_front->previous == nullptr); | |
obj->readers_front->previous = new_reader; | |
} | |
new_reader->next = obj->readers_front; | |
new_reader->previous = nullptr; | |
obj->readers_front = new_reader; | |
return new_reader; | |
} | |
void rcu_destroy_reader(rcu_reader_t *reader) | |
{ | |
rcu_t *obj = reader->obj; | |
if (obj->readers_front == reader) | |
{ | |
assert(reader->previous == nullptr); | |
obj->readers_front = reader->next; | |
} | |
if (reader->previous) | |
reader->previous->next = reader->next; | |
if(reader->next) | |
reader->next->previous = reader->previous; | |
free(reader); | |
} | |
void *rcu_acquire_value(rcu_reader_t *reader) | |
{ | |
//I need to convince myself this function does what I think it does | |
assert(reader->min_epoch == 0); | |
uint64_t current_epoch = c89atomic_load_64(&reader->obj->current_epoch); | |
c89atomic_store_64(&reader->min_epoch, current_epoch); | |
assert(reader->min_epoch > 0); | |
return c89atomic_load_ptr(&reader->obj->value); | |
} | |
void rcu_release_value(rcu_reader_t *reader) | |
{ | |
assert(reader->min_epoch > 0); | |
c89atomic_store_64(&reader->min_epoch, 0); | |
} | |
struct test_t { | |
int val; | |
}; | |
void rcu_test() | |
{ | |
rcu_t obj = rcu_create_default(); | |
assert(obj.value == nullptr); | |
assert(obj.readers_front == nullptr); | |
assert(obj.zombie_count == 0); | |
assert(obj.current_epoch > 0); | |
rcu_reader_t *reader_a = rcu_create_reader(&obj); | |
rcu_reader_t *reader_b = rcu_create_reader(&obj); | |
{ | |
test_t *value = (test_t*)rcu_acquire_value(reader_a); | |
assert(value == nullptr); | |
rcu_release_value(reader_a); | |
} | |
test_t value_a = { 10 }; | |
rcu_update(&obj, &value_a); | |
{ | |
test_t *reclaim_value_buffer[32] = {}; | |
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, 32); | |
assert(reclaimed_value_count == 0); | |
} | |
test_t value_b = { 11 }; | |
rcu_update(&obj, &value_b); | |
{ | |
test_t *reclaim_value_buffer[32] = {}; | |
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, 32); | |
assert(reclaimed_value_count == 1); | |
assert(reclaim_value_buffer[0] == &value_a); | |
} | |
{ | |
test_t *value = (test_t*)rcu_acquire_value(reader_a); | |
assert(value == &value_b); | |
rcu_release_value(reader_a); | |
} | |
test_t *value_b_ptr = (test_t*)rcu_acquire_value(reader_a); | |
assert(value_b_ptr == &value_b); | |
test_t value_c = { 12 }; | |
rcu_update(&obj, &value_c); | |
test_t *value_c_ptr = (test_t*)rcu_acquire_value(reader_b); | |
assert(value_c_ptr == &value_c); | |
assert(value_b_ptr->val == 11); | |
assert(value_c_ptr->val == 12); | |
rcu_release_value(reader_a); | |
rcu_release_value(reader_b); | |
rcu_destroy_reader(reader_a); | |
rcu_destroy_reader(reader_b); | |
{ | |
test_t *reclaim_value_buffer[32] = {}; | |
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, 32); | |
assert(reclaimed_value_count == 1); | |
assert(reclaim_value_buffer[0] == &value_b); | |
assert(obj.value == &value_c); | |
} | |
#define UPDATE_COUNT 100000 | |
rcu_reader_t **readers = (rcu_reader_t**)malloc(sizeof(rcu_reader_t*) * UPDATE_COUNT); | |
for (int i = 0; i < UPDATE_COUNT; i++) | |
{ | |
readers[i] = rcu_create_reader(&obj); | |
} | |
test_t *values = (test_t*)malloc(sizeof(test_t) * UPDATE_COUNT); | |
std::thread main_thread { [&] { | |
for (int i = 0; i < UPDATE_COUNT; i++) | |
{ | |
rcu_update(&obj, &values[i]); | |
} | |
} }; | |
std::thread audio_thread { [&] { | |
for (int i = 0; i < UPDATE_COUNT; i++) | |
{ | |
rcu_acquire_value(readers[i]); | |
} | |
for (int i = 0; i < UPDATE_COUNT; i++) | |
{ | |
rcu_release_value(readers[i]); | |
} | |
} }; | |
main_thread.join(); | |
audio_thread.join(); | |
for (int i = 0; i < UPDATE_COUNT; i++) | |
{ | |
rcu_destroy_reader(readers[i]); | |
} | |
{ | |
test_t *reclaim_value_buffer[UPDATE_COUNT] = {}; | |
size_t reclaimed_value_count = rcu_reclaim(&obj, (void**)reclaim_value_buffer, UPDATE_COUNT); | |
printf("%llu\n", reclaimed_value_count); | |
//assert(reclaimed_value_count == UPDATE_COUNT); | |
} | |
test_t *last_value = (test_t*)rcu_destroy(&obj); | |
free(readers); | |
free(values); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment