Last active
April 4, 2024 01:49
-
-
Save RealNeGate/7dd84f7b6ef37affedcbacf27bc4e52f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////// | |
// NBHS - Non-blocking hashset | |
//////////////////////////////// | |
// You wanna intern lots of things on lots of cores? this is for you. It's | |
// inspired by Cliff's non-blocking hashmap. | |
#ifndef NBHS_H | |
#define NBHS_H | |
typedef void* (*NBHS_AllocZeroMem)(size_t size); | |
typedef void (*NBHS_FreeMem)(void* ptr, size_t size); | |
typedef uint32_t (*NBHS_Hash)(const void* a); | |
typedef bool (*NBHS_Compare)(const void* a, const void* b); | |
typedef struct NBHS_Table NBHS_Table; | |
typedef struct { | |
NBHS_AllocZeroMem alloc_mem; | |
NBHS_FreeMem free_mem; | |
NBHS_Compare compare; | |
NBHS_Hash hash; | |
_Atomic(NBHS_Table*) latest; | |
} NBHS; | |
NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash); | |
void nbhs_free(NBHS* hs); | |
void* nbhs_intern(NBHS* hs, void* val); | |
#endif // NBHS_H | |
#ifdef NBHS_IMPL | |
// personal debooging stuff | |
#if 0 | |
#define NBHS__BEGIN(name) | |
#define NBHS__END() | |
#else | |
#define NBHS__BEGIN(name) spall_auto_buffer_begin(#name, sizeof(#name) - 1, NULL, 0) | |
#define NBHS__END() spall_auto_buffer_end() | |
#endif | |
// private constants (idk if i wanna undef these but don't touch i guess?) | |
#define NBHS_RESERVE_BIT (1ull << 63ull) | |
struct NBHS_Table { | |
_Atomic(NBHS_Table*) prev; | |
uint32_t exp; | |
// tracks how many entries have | |
// been moved once we're resizing | |
_Atomic uint32_t moved; | |
_Atomic uint32_t move_done; | |
_Atomic uint32_t count; | |
_Atomic(uintptr_t) data[]; | |
}; | |
// single hazard pointer per thread | |
typedef struct NBHS_HazardEntry { | |
_Atomic(struct NBHS_HazardEntry*) next; | |
// normal insertion and work: | |
// 0 - latest | |
// 1 - prev | |
// inside the prev moving code, it does an insert so | |
// it needs separate hazard entries: | |
// 2 - latest | |
// 3 - prev | |
// misc: | |
// 4 - temporary | |
_Atomic(NBHS_Table*) ptr[5]; | |
} NBHS_HazardEntry; | |
static _Thread_local bool nbhs_hazard_init; | |
static _Thread_local NBHS_HazardEntry nbhs_hazard; | |
static _Atomic(NBHS_HazardEntry*) nbhs_hazard_list; | |
static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); } | |
static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); } | |
static void* nbhs_try_entry(NBHS* hs, NBHS_Table* table, int i, void* val, uintptr_t entry) { | |
void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT); | |
if (hs->compare(entry_p, val)) { | |
if ((entry & NBHS_RESERVE_BIT) == 0) { | |
// not reserved, yay | |
return entry_p; | |
} else { | |
// either NULL or complete can go thru without waiting | |
uintptr_t decision; | |
while (decision = atomic_load(&table->data[i]), decision & NBHS_RESERVE_BIT) { | |
// idk if i should do a nicer wait than a spin-lock | |
} | |
return (void*) decision; | |
} | |
} else { | |
return NULL; | |
} | |
} | |
static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) { | |
size_t mask = (1 << table->exp) - 1; | |
size_t first = h & mask, i = first; | |
do { | |
uintptr_t entry = atomic_load(&table->data[i]); | |
if (entry == 0) { | |
return NULL; | |
} | |
void* k = nbhs_try_entry(hs, table, i, val, entry); | |
if (k != NULL) { | |
return k; | |
} | |
i = (i + 1) & mask; | |
} while (i != first); | |
return NULL; | |
} | |
static NBHS_Table* nbhs_hazard_access(_Atomic(NBHS_Table*)* table, int slot) { | |
NBHS_Table* val = atomic_load(table); | |
for (;;) { | |
// mark as hazard | |
nbhs_hazard.ptr[slot] = val; | |
// check if it's been invalidated since the previous load, if so | |
// then undo hazard and try load again. | |
NBHS_Table* after = atomic_load(table); | |
if (val == after) { | |
return val; | |
} | |
val = after; | |
} | |
} | |
static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard_slot) { | |
// resize on 50% load factor and we're not in the moving process rn | |
uint32_t threshold = (1 << latest->exp) / 2; | |
if (latest->count >= threshold && atomic_load(&latest->prev) == NULL) { | |
// make resized table, we'll amortize the moves upward | |
size_t new_cap = 1ull << (latest->exp + 1); | |
NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); | |
new_top->exp = latest->exp + 1; | |
// CAS latest -> new_table, if another thread wins the race we'll use its table | |
new_top->prev = latest; | |
if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { | |
hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); | |
} else { | |
latest = new_top; | |
} | |
nbhs_hazard.ptr[hazard_slot] = latest; | |
} | |
// actually lookup & insert | |
uint32_t h = hs->hash(val); | |
uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT; | |
void* result = NULL; | |
for (;;) { | |
size_t mask = (1 << latest->exp) - 1; | |
size_t first = h & mask, i = first; | |
do { | |
uintptr_t entry = atomic_load(&latest->data[i]); | |
if (entry == 0) { | |
// if we spot a NULL, the entry has never been in this table, that doesn't mean | |
// it's not appeared already so we'll only reserve the slot until we figure that | |
// out. | |
// | |
// everyone else will see our pointer marked as reserved and are free to wait on | |
// us if they match, if not they can move along. | |
if (atomic_compare_exchange_strong(&latest->data[i], &entry, reserved_form)) { | |
void* old = NULL; | |
NBHS_Table* curr = nbhs_hazard_access(&latest->prev, hazard_slot + 1); | |
if (curr != NULL) { | |
// should only be one previous table | |
assert(curr->prev == NULL); | |
old = nbhs_raw_lookup(hs, curr, h, val); | |
nbhs_hazard.ptr[hazard_slot + 1] = NULL; | |
} | |
// count doesn't care that it's a migration, it's at least not replacing an existing | |
// slot in this version of the table. | |
atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed); | |
if (old != NULL) { | |
atomic_exchange(&latest->data[i], (uintptr_t) old); | |
result = old; | |
} else { | |
// no entry was found, good (reserved -> val) | |
atomic_exchange(&latest->data[i], (uintptr_t) val); | |
result = val; | |
} | |
break; | |
} | |
} | |
void* k = nbhs_try_entry(hs, latest, i, val, entry); | |
if (k != NULL) { | |
return k; | |
} | |
i = (i + 1) & mask; | |
} while (i != first); | |
// if the table changed before our eyes, it means someone resized which sucks | |
// but it just means we need to retry | |
NBHS_Table* new_latest = nbhs_hazard_access(&hs->latest, 4); | |
if (latest == new_latest && result != NULL) { | |
nbhs_hazard.ptr[hazard_slot] = NULL; | |
nbhs_hazard.ptr[4] = NULL; | |
return result; | |
} | |
// move to the correct hazard slot | |
nbhs_hazard.ptr[hazard_slot] = new_latest; | |
nbhs_hazard.ptr[4] = NULL; | |
latest = new_latest; | |
} | |
} | |
NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash) { | |
if (alloc_mem == NULL) { | |
assert(free_mem == NULL); | |
alloc_mem = nbhs__alloc_zero_mem; | |
free_mem = nbhs__free_mem; | |
} | |
size_t exp = 64 - __builtin_clzll(initial_cap - 1); | |
NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(uintptr_t)); | |
table->exp = exp; | |
return (NBHS){ | |
.alloc_mem = alloc_mem, | |
.free_mem = free_mem, | |
.compare = compare, | |
.hash = hash, | |
.latest = table | |
}; | |
} | |
void nbhs_free(NBHS* hs) { | |
NBHS_Table* curr = hs->latest; | |
while (curr) { | |
NBHS_Table* next = curr->prev; | |
hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(uintptr_t)); | |
curr = next; | |
} | |
} | |
void* nbhs_intern(NBHS* hs, void* val) { | |
NBHS__BEGIN("intern"); | |
if (!nbhs_hazard_init) { | |
NBHS__BEGIN("init"); | |
nbhs_hazard_init = true; | |
// add to hazard list, we never free this because i don't care | |
NBHS_HazardEntry* old; | |
do { | |
old = atomic_load_explicit(&nbhs_hazard_list, memory_order_relaxed); | |
nbhs_hazard.next = old; | |
} while (!atomic_compare_exchange_strong(&nbhs_hazard_list, &old, &nbhs_hazard)); | |
NBHS__END(); | |
} | |
NBHS_Table* latest = nbhs_hazard_access(&hs->latest, 0); | |
// if there's earlier versions of the table we can move up entries as we go | |
// along. | |
NBHS_Table* prev = nbhs_hazard_access(&latest->prev, 1); | |
if (prev) { | |
size_t cap = 1ull << prev->exp; | |
// snatch up some number of items | |
uint32_t old, new; | |
do { | |
old = atomic_load(&prev->moved); | |
if (old == cap) { goto skip; } | |
// cap the number of items to copy... by the cap | |
new = old + 32; | |
if (new > cap) { new = cap; } | |
} while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new)); | |
NBHS__BEGIN("copying old"); | |
for (size_t i = old; i < new; i++) { | |
// either NULL or complete can go thru without waiting | |
uintptr_t decision; | |
while (decision = atomic_load(&prev->data[i]), decision & NBHS_RESERVE_BIT) { | |
// idk if i should do a nicer wait than a spin-lock | |
} | |
if (decision) { | |
nbhs_raw_intern(hs, latest, (void*) decision, 2); | |
} | |
} | |
NBHS__END(); | |
uint32_t done = atomic_fetch_add(&prev->move_done, new - old); | |
done += new - old; | |
assert(done <= cap); | |
if (done == cap) { | |
// dettach now | |
NBHS__BEGIN("detach"); | |
assert(prev->prev == NULL); | |
latest->prev = NULL; | |
NBHS__BEGIN("scan"); | |
// wait for all refs to stop holding on (just read the hazards until they don't match) | |
NBHS_HazardEntry* us = &nbhs_hazard; | |
for (NBHS_HazardEntry* list = atomic_load(&nbhs_hazard_list); list; list = list->next) { | |
// mark sure no ptrs refer to prev | |
if (us != list) for (size_t i = 0; i < (sizeof(list->ptr)/sizeof(void*)); i++) { | |
while (list->ptr[i] == prev) { | |
// spin-lock | |
} | |
} | |
} | |
NBHS__END(); | |
// no one's referring to it and no one will ever refer to it again | |
hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(uintptr_t)); | |
NBHS__END(); | |
} | |
skip:; | |
} | |
nbhs_hazard.ptr[1] = NULL; // ok it can be freed now | |
void* result = nbhs_raw_intern(hs, latest, val, 0); | |
NBHS__END(); | |
return result; | |
} | |
#endif // NBHS_IMPL | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define _CRT_SECURE_NO_WARNINGS | |
#include <stdio.h> | |
#include <stdint.h> | |
#include <stddef.h> | |
#include <stdlib.h> | |
#include <assert.h> | |
#include <stdbool.h> | |
#include <stdatomic.h> | |
#define WIN32_LEAN_AND_MEAN | |
#include <windows.h> | |
#include <process.h> | |
#include "spall_native_auto.h" | |
// #define spall_auto_buffer_begin(...) | |
// #define spall_auto_buffer_end(...) | |
#define NBHS_IMPL | |
#include "nbhs.h" | |
static uint32_t my_hash(const void* a) { return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull; } | |
static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; } | |
// https://github.com/demetri/scribbles/blob/master/randomness/prngs.c | |
uint32_t pcg32_pie(uint64_t *state) { | |
uint64_t old = *state ^ 0xc90fdaa2adf85459ULL; | |
*state = *state * 6364136223846793005ULL + 0xc90fdaa2adf85459ULL; | |
uint32_t xorshifted = ((old >> 18u) ^ old) >> 27u; | |
uint32_t rot = old >> 59u; | |
return (xorshifted >> rot) | (xorshifted << ((-rot) & 31)); | |
} | |
static NBHS test_set; | |
static _Atomic int inserted, duplicates; | |
static int attempts; // per thread | |
static unsigned int test_thread_fn(void* arg) { | |
uintptr_t starting_id = (uintptr_t) arg; | |
uint64_t seed = starting_id * 11400714819323198485ULL; | |
spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE); | |
spall_auto_buffer_begin("work", 4, NULL, 0); | |
uint32_t* arr = malloc(attempts * sizeof(uint32_t)); | |
for (int i = 0; i < attempts; i++) { | |
arr[i] = pcg32_pie(&seed) & 0xFFFFF; | |
if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) { | |
inserted += 1; | |
} else { | |
duplicates += 1; | |
} | |
} | |
spall_auto_buffer_end(); | |
spall_auto_thread_quit(); | |
return 0; | |
} | |
int main(int argc, char** argv) { | |
spall_auto_init((char *)"profile.spall"); | |
spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE); | |
int threads = atoi(argv[1]); | |
printf("Testing with %d threads\n", threads); | |
attempts = 4000000 / threads; | |
test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash); | |
HANDLE* arr = malloc(threads * sizeof(HANDLE)); | |
for (int i = 0; i < threads; i++) { | |
arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0); | |
} | |
WaitForMultipleObjects(threads, arr, true, INFINITE); | |
printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*threads); | |
if (inserted + duplicates != attempts*threads) { | |
printf("FAIL!\n"); | |
abort(); | |
} | |
spall_auto_thread_quit(); | |
spall_auto_quit(); | |
return 0; | |
} | |
// #undef spall_auto_buffer_begin | |
// #undef spall_auto_buffer_end | |
#define SPALL_AUTO_IMPLEMENTATION | |
#include "spall_native_auto.h" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment