Last active
February 9, 2017 21:44
-
-
Save nickhutchinson/37daa75771d50ff1eff2e905b142871d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://groups.google.com/forum/#!topic/comp.programming.threads/2rIqF8MsaxM | |
// Scalable distributed reference counting with lightweight acquire/ | |
// release operations. | |
// | |
// It supports: | |
// - Reference counting with basic-thread safety | |
// - Reference counting with strong-thread safety | |
// - Reference counting with cross-thread references | |
// - Ultra low overhead PDR | |
// - PDR with long-term cross-thread references | |
// | |
// Assumptions about environment I made. | |
// - Fixed number of threads. I made this assumption partly for | |
// simplicity and partly because I'm interested in such environment. This | |
// can be overcome with addition of thread registration mechanism. | |
// - Fixed maximum number of reference counted objects. Solely for | |
// simplicity. Dynamic allocation of helper structures must be added. | |
// - User threads must periodically execute special function. I made this | |
// assumption partly for simplicity and partly because I'm interested in | |
// such environment. This can be overcome if call to this special | |
// function will be inserted into acquire/release operations. Or with | |
// some other mechanisms like signals/APC etc. | |
// | |
// Here is public API: | |
typedef void (*rcx_dtor_fp_t)(void*); | |
typedef unsigned rcx_t[4]; | |
int rcx_sys_init(unsigned thread_count); | |
void rcx_sys_deinit(); | |
int rcx_sys_thread_init(unsigned thread_idx); | |
void rcx_sys_thread_deinit(); | |
// must be periodically executed by all user threads | |
void rcx_process(); | |
// voluntarily initiate garbage collection (optional) | |
void rcx_collect(); | |
int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state); | |
void rcx_destroy(rcx_t* rcx); | |
void* rcx_get_state(rcx_t* rcx); | |
void rcx_acquire(rcx_t* rcx); | |
void rcx_release(rcx_t* rcx); | |
// Here is implementation: | |
#include <stdlib.h> | |
#include "rcx.h" | |
typedef char rcx_cacheline_pad_t[128]; | |
static unsigned const rcx_max_object_count = 1024; | |
static unsigned const rcx_activity_acq = 1; | |
static unsigned const rcx_activity_rel = 1024; | |
static unsigned const rcx_activity_rel_new = 16 * 1024; | |
static unsigned const rcx_activity_threshold = 128 * 16 * 1024; | |
static unsigned const rcx_in_list_flag = 1u << 31; | |
#define FULL_FENCE() __asm mfence | |
static void rcx_process_epoch(); | |
// decoding of rcx_t | |
typedef struct rcx_int_s { | |
// index of per thread and global object descriptor | |
unsigned idx_; | |
// index of owner thread | |
unsigned thread_idx_; | |
// user supplied dtor and state | |
rcx_dtor_fp_t dtor_fp_; | |
void* state_; | |
} rcx_int_t; | |
// per thread object descriptor | |
typedef struct rcx_obj_data_s { | |
// cache for thread's acquire/release operations | |
// high bit used as flag | |
unsigned rc_; | |
} rcx_obj_data_t; | |
// global object descriptor | |
typedef struct rcx_gl_obj_data_s { | |
// pointer to owning rcx_t object | |
rcx_int_t* rcx_; | |
// global reference count | |
unsigned rc_; | |
// double-linked list of object descriptors | |
// for which rc drops to zero | |
struct rcx_gl_obj_data_s* volatile next_; | |
struct rcx_gl_obj_data_s* volatile prev_; | |
} rcx_gl_obj_data_t; | |
// per thread data | |
typedef struct rcx_thread_s { | |
// index [0 .. thread_count-1] | |
unsigned index_; | |
// list of free object descriptors | |
rcx_gl_obj_data_t* volatile freelist_head_; | |
rcx_gl_obj_data_t* freelist_tail_; | |
// cache of foreign free object descriptors | |
rcx_gl_obj_data_t* freelist_cache_; | |
// array of per thread object descriptors | |
rcx_obj_data_t* obj_data_; | |
// array of object descriptors | |
// for which acquire/release operations | |
// was executed in current epoch | |
rcx_int_t** rc_list_; | |
// list of object descriptors | |
// for which rc drops to zero | |
rcx_gl_obj_data_t* dtor_list_; | |
// head and tail nodes for dtor_list_ | |
rcx_gl_obj_data_t dtor_fake_[2]; | |
rcx_cacheline_pad_t pad_; | |
} rcx_thread_t; | |
// per thread data stored in tls | |
typedef struct rcx_thread_local_s { | |
// copy of rcx_thread_t::obj_data_ | |
rcx_obj_data_t* obj_data_; | |
// copy of rcx_thread_t::rc_list_ | |
rcx_int_t** rc_list_; | |
// estimation of 'activity' produced by thread | |
// used for initiation of epoch shift | |
unsigned activity_; | |
// current position in rc_list_ | |
unsigned rc_list_pos_; | |
// copy of rcx_thread_t::index_ | |
unsigned index_; | |
} rcx_thread_local_t; | |
// global data | |
typedef struct rcx_global_s { | |
// array of global object descriptors | |
rcx_gl_obj_data_t* gl_obj_data_; | |
// array of per thread data | |
rcx_thread_t* threads_; | |
// thread count | |
unsigned thread_count_; | |
// flag that some thread | |
// wants to make epoch shift | |
unsigned volatile epoch_pending_; | |
// index of thread which allowed | |
// to copy local counters to global counters | |
unsigned volatile epoch_order_; | |
} rcx_global_t; | |
static rcx_global_t rcx_global; | |
static __declspec(thread) rcx_thread_local_t rcx_thread; | |
int rcx_sys_init(unsigned thread_count) { | |
unsigned i, j, first, last; | |
rcx_global.gl_obj_data_ = | |
calloc(rcx_max_object_count, sizeof(rcx_gl_obj_data_t)); | |
rcx_global.threads_ = calloc(thread_count, sizeof(rcx_thread_t)); | |
rcx_global.thread_count_ = thread_count; | |
rcx_global.epoch_pending_ = 0; | |
rcx_global.epoch_order_ = thread_count; | |
for (i = 0; i != rcx_global.thread_count_; ++i) { | |
rcx_thread_t* th = &rcx_global.threads_[i]; | |
th->index_ = i; | |
th->rc_list_ = calloc(rcx_max_object_count, sizeof(rcx_int_t*)); | |
th->obj_data_ = calloc(rcx_max_object_count, sizeof(rcx_obj_data_t)); | |
first = i * (rcx_max_object_count / thread_count); | |
last = (i + 1) * (rcx_max_object_count / thread_count) - 1; | |
for (j = first; j != last; ++j) { | |
rcx_global.gl_obj_data_[j].rcx_ = (rcx_int_t*)j; | |
rcx_global.gl_obj_data_[j].next_ = &rcx_global.gl_obj_data_[j + 1]; | |
} | |
rcx_global.gl_obj_data_[last].rcx_ = (rcx_int_t*)last; | |
th->freelist_head_ = &rcx_global.gl_obj_data_[last]; | |
th->freelist_tail_ = &rcx_global.gl_obj_data_[first]; | |
th->freelist_cache_ = 0; | |
th->dtor_list_ = &th->dtor_fake_[0]; | |
th->dtor_fake_[0].next_ = &th->dtor_fake_[1]; | |
th->dtor_fake_[0].prev_ = 0; | |
th->dtor_fake_[1].next_ = 0; | |
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0]; | |
} | |
return 0; | |
} | |
void rcx_sys_deinit() { | |
// here we must free all survived rcx_t objects | |
// it's not implemented yet | |
free(rcx_global.threads_); | |
free(rcx_global.gl_obj_data_); | |
} | |
int rcx_sys_thread_init(unsigned thread_idx) { | |
rcx_thread.obj_data_ = rcx_global.threads_[thread_idx].obj_data_; | |
rcx_thread.rc_list_ = rcx_global.threads_[thread_idx].rc_list_; | |
rcx_thread.rc_list_pos_ = 0; | |
rcx_thread.activity_ = 0; | |
rcx_thread.index_ = thread_idx; | |
return 0; | |
} | |
void rcx_sys_thread_deinit() {} | |
int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state) { | |
rcx_int_t* x = (rcx_int_t*)rcx; | |
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_]; | |
// get free object descriptor from per thread queue | |
rcx_gl_obj_data_t* gl = th->freelist_tail_; | |
th->freelist_tail_ = th->freelist_tail_->next_; | |
x->idx_ = (unsigned)gl->rcx_; | |
x->thread_idx_ = rcx_thread.index_; | |
x->dtor_fp_ = dtor_fp; | |
x->state_ = state; | |
gl->rcx_ = x; | |
gl->rc_ = 1; | |
gl->next_ = 0; | |
gl->prev_ = 0; | |
return 0; | |
} | |
void rcx_destroy(rcx_t* rcx) { | |
rcx_int_t* x = (rcx_int_t*)rcx; | |
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_]; | |
rcx_gl_obj_data_t* gl = &rcx_global.gl_obj_data_[x->idx_]; | |
gl->rcx_ = (rcx_int_t*)x->idx_; | |
gl->rc_ = x->thread_idx_; | |
// return object descriptor to local cache | |
gl->next_ = th->freelist_cache_; | |
th->freelist_cache_ = gl; | |
} | |
void rcx_acquire(rcx_t* rcx) { | |
rcx_int_t* x = (rcx_int_t*)rcx; | |
rcx_thread_local_t* th = &rcx_thread; | |
// find per thread object descriptor | |
rcx_obj_data_t* obj = &th->obj_data_[x->idx_]; | |
// check whether object descriptor already in local list | |
// of descriptors for which thread have executed | |
// acquire/release operation | |
if (obj->rc_) { | |
// descriptor already in list | |
// just increment counter | |
obj->rc_ += 1; | |
} else { | |
// descriptor not in list | |
// increment and mark counter | |
obj->rc_ += 1 + rcx_in_list_flag; | |
// add descriptor to list | |
th->rc_list_[th->rc_list_pos_++] = x; | |
} | |
th->activity_ += rcx_activity_acq; | |
} | |
void rcx_release(rcx_t* rcx) { | |
rcx_int_t* x = (rcx_int_t*)rcx; | |
rcx_thread_local_t* th = &rcx_thread; | |
rcx_obj_data_t* obj = &th->obj_data_[x->idx_]; | |
if (obj->rc_) { | |
obj->rc_ -= 1; | |
th->activity_ += rcx_activity_rel; | |
} else { | |
obj->rc_ += -1 + rcx_in_list_flag; | |
th->rc_list_[th->rc_list_pos_++] = x; | |
th->activity_ += rcx_activity_rel_new; | |
} | |
} | |
void* rcx_get_state(rcx_t* rcx) { | |
rcx_int_t* x = (rcx_int_t*)rcx; | |
return x->state_; | |
} | |
void rcx_process() { | |
rcx_thread_local_t* th = &rcx_thread; | |
if (th->activity_ >= rcx_activity_threshold && | |
0 == rcx_global.epoch_pending_) { | |
// we've created enough activity | |
// initiate epoch shift | |
rcx_global.epoch_pending_ = 1; | |
} | |
if (0 == th->index_ && rcx_global.epoch_pending_ && | |
rcx_global.epoch_order_ == rcx_global.thread_count_) { | |
// thread with index 0 | |
// starts epoch processing | |
rcx_global.epoch_order_ = 0; | |
} | |
if (th->index_ == rcx_global.epoch_order_) { | |
// it's my turn - process epoch | |
rcx_process_epoch(); | |
// notify next thread | |
rcx_global.epoch_order_ += 1; | |
if (rcx_global.epoch_order_ == rcx_global.thread_count_) { | |
rcx_global.epoch_pending_ = 0; | |
} | |
} | |
} | |
void rcx_collect() { | |
if (0 == rcx_global.epoch_pending_) rcx_global.epoch_pending_ = 1; | |
rcx_process(); | |
} | |
void rcx_process_epoch() { | |
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_]; | |
rcx_thread_t* th2; | |
rcx_gl_obj_data_t* cur; | |
rcx_gl_obj_data_t* next; | |
rcx_obj_data_t* loc; | |
unsigned i; | |
rcx_int_t* req; | |
// transfer cached object descriptors to owner thread | |
while (th->freelist_cache_) { | |
cur = th->freelist_cache_; | |
next = cur->next_; | |
th2 = &rcx_global.threads_[cur->rc_]; | |
th2->freelist_head_->next_ = cur; | |
th2->freelist_head_ = cur; | |
th->freelist_cache_ = next; | |
} | |
// execute dtor function for objects | |
// which are located in dtor list | |
// for full epoch | |
cur = th->dtor_list_->next_; | |
if (cur->next_) { | |
while (cur->next_) { | |
next = cur->next_; | |
cur->rc_ = 1; | |
cur->next_ = 0; | |
cur->prev_ = 0; | |
// here object descriptor is ready for reuse | |
cur->rcx_->dtor_fp_(cur->rcx_->state_); | |
cur = next; | |
} | |
th->dtor_fake_[0].next_ = &th->dtor_fake_[1]; | |
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0]; | |
} | |
// transfer all acquire/release operations | |
// from local descriptor to global | |
for (i = 0; i != rcx_thread.rc_list_pos_; ++i) { | |
req = th->rc_list_[i]; | |
// global descriptor | |
cur = &rcx_global.gl_obj_data_[req->idx_]; | |
// local descriptor | |
loc = &th->obj_data_[req->idx_]; | |
cur->rc_ += loc->rc_ - rcx_in_list_flag; | |
loc->rc_ = 0; | |
if (cur->rc_ && cur->next_) { | |
// remove object from dtor list | |
cur->prev_->next_ = cur->next_; | |
cur->next_->prev_ = cur->prev_; | |
cur->next_ = 0; | |
cur->prev_ = 0; | |
} else if (0 == cur->rc_ && 0 == cur->next_) { | |
// insert object to dtor list | |
cur->next_ = th->dtor_list_->next_; | |
cur->prev_ = th->dtor_list_; | |
th->dtor_list_->next_->prev_ = cur; | |
th->dtor_list_->next_ = cur; | |
} else if (0 == cur->rc_ && cur->next_) { | |
// remove and reinsert object to dtor list | |
cur->prev_->next_ = cur->next_; | |
cur->next_->prev_ = cur->prev_; | |
cur->next_ = th->dtor_list_->next_; | |
cur->prev_ = th->dtor_list_; | |
th->dtor_list_->next_->prev_ = cur; | |
th->dtor_list_->next_ = cur; | |
} | |
} | |
// reset local per-epoch variables | |
rcx_thread.activity_ = 0; | |
rcx_thread.rc_list_pos_ = 0; | |
// to support PDR | |
FULL_FENCE(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment