Skip to content

Instantly share code, notes, and snippets.

@nickhutchinson
Last active February 9, 2017 21:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickhutchinson/37daa75771d50ff1eff2e905b142871d to your computer and use it in GitHub Desktop.
Save nickhutchinson/37daa75771d50ff1eff2e905b142871d to your computer and use it in GitHub Desktop.
// https://groups.google.com/forum/#!topic/comp.programming.threads/2rIqF8MsaxM
// Scalable distributed reference counting with lightweight acquire/
// release operations.
//
// It supports:
// - Reference counting with basic-thread safety
// - Reference counting with strong-thread safety
// - Reference counting with cross-thread references
// - Ultra low overhead PDR
// - PDR with long-term cross-thread references
//
// Assumptions about environment I made.
// - Fixed number of threads. I made this assumption partly for
// simplicity and partly because I'm interested in such environment. This
// can be overcome with addition of thread registration mechanism.
// - Fixed maximum number of reference counted objects. Solely for
// simplicity. Dynamic allocation of helper structures must be added.
// - User threads must periodically execute special function. I made this
// assumption partly for simplicity and partly because I'm interested in
// such environment. This can be overcome if call to this special
// function will be inserted into acquire/release operations. Or with
// some other mechanisms like signals/APC etc.
//
// Here is public API:
typedef void (*rcx_dtor_fp_t)(void*);
typedef unsigned rcx_t[4];
int rcx_sys_init(unsigned thread_count);
void rcx_sys_deinit();
int rcx_sys_thread_init(unsigned thread_idx);
void rcx_sys_thread_deinit();
// must be periodically executed by all user threads
void rcx_process();
// voluntarily initiate garbage collection (optional)
void rcx_collect();
int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state);
void rcx_destroy(rcx_t* rcx);
void* rcx_get_state(rcx_t* rcx);
void rcx_acquire(rcx_t* rcx);
void rcx_release(rcx_t* rcx);
// Here is implementation:
#include <stdlib.h>
#include "rcx.h"
typedef char rcx_cacheline_pad_t[128];
static unsigned const rcx_max_object_count = 1024;
static unsigned const rcx_activity_acq = 1;
static unsigned const rcx_activity_rel = 1024;
static unsigned const rcx_activity_rel_new = 16 * 1024;
static unsigned const rcx_activity_threshold = 128 * 16 * 1024;
static unsigned const rcx_in_list_flag = 1u << 31;
#define FULL_FENCE() __asm mfence
static void rcx_process_epoch();
// decoding of rcx_t
typedef struct rcx_int_s {
// index of per thread and global object descriptor
unsigned idx_;
// index of owner thread
unsigned thread_idx_;
// user supplied dtor and state
rcx_dtor_fp_t dtor_fp_;
void* state_;
} rcx_int_t;
// per thread object descriptor
typedef struct rcx_obj_data_s {
// cache for thread's acquire/release operations
// high bit used as flag
unsigned rc_;
} rcx_obj_data_t;
// global object descriptor
typedef struct rcx_gl_obj_data_s {
// pointer to owning rcx_t object
rcx_int_t* rcx_;
// global reference count
unsigned rc_;
// double-linked list of object descriptors
// for which rc drops to zero
struct rcx_gl_obj_data_s* volatile next_;
struct rcx_gl_obj_data_s* volatile prev_;
} rcx_gl_obj_data_t;
// per thread data
typedef struct rcx_thread_s {
// index [0 .. thread_count-1]
unsigned index_;
// list of free object descriptors
rcx_gl_obj_data_t* volatile freelist_head_;
rcx_gl_obj_data_t* freelist_tail_;
// cache of foreign free object descriptors
rcx_gl_obj_data_t* freelist_cache_;
// array of per thread object descriptors
rcx_obj_data_t* obj_data_;
// array of object descriptors
// for which acquire/release operations
// was executed in current epoch
rcx_int_t** rc_list_;
// list of object descriptors
// for which rc drops to zero
rcx_gl_obj_data_t* dtor_list_;
// head and tail nodes for dtor_list_
rcx_gl_obj_data_t dtor_fake_[2];
rcx_cacheline_pad_t pad_;
} rcx_thread_t;
// per thread data stored in tls
typedef struct rcx_thread_local_s {
// copy of rcx_thread_t::obj_data_
rcx_obj_data_t* obj_data_;
// copy of rcx_thread_t::rc_list_
rcx_int_t** rc_list_;
// estimation of 'activity' produced by thread
// used for initiation of epoch shift
unsigned activity_;
// current position in rc_list_
unsigned rc_list_pos_;
// copy of rcx_thread_t::index_
unsigned index_;
} rcx_thread_local_t;
// global data
typedef struct rcx_global_s {
// array of global object descriptors
rcx_gl_obj_data_t* gl_obj_data_;
// array of per thread data
rcx_thread_t* threads_;
// thread count
unsigned thread_count_;
// flag that some thread
// wants to make epoch shift
unsigned volatile epoch_pending_;
// index of thread which allowed
// to copy local counters to global counters
unsigned volatile epoch_order_;
} rcx_global_t;
static rcx_global_t rcx_global;
static __declspec(thread) rcx_thread_local_t rcx_thread;
int rcx_sys_init(unsigned thread_count) {
unsigned i, j, first, last;
rcx_global.gl_obj_data_ =
calloc(rcx_max_object_count, sizeof(rcx_gl_obj_data_t));
rcx_global.threads_ = calloc(thread_count, sizeof(rcx_thread_t));
rcx_global.thread_count_ = thread_count;
rcx_global.epoch_pending_ = 0;
rcx_global.epoch_order_ = thread_count;
for (i = 0; i != rcx_global.thread_count_; ++i) {
rcx_thread_t* th = &rcx_global.threads_[i];
th->index_ = i;
th->rc_list_ = calloc(rcx_max_object_count, sizeof(rcx_int_t*));
th->obj_data_ = calloc(rcx_max_object_count, sizeof(rcx_obj_data_t));
first = i * (rcx_max_object_count / thread_count);
last = (i + 1) * (rcx_max_object_count / thread_count) - 1;
for (j = first; j != last; ++j) {
rcx_global.gl_obj_data_[j].rcx_ = (rcx_int_t*)j;
rcx_global.gl_obj_data_[j].next_ = &rcx_global.gl_obj_data_[j + 1];
}
rcx_global.gl_obj_data_[last].rcx_ = (rcx_int_t*)last;
th->freelist_head_ = &rcx_global.gl_obj_data_[last];
th->freelist_tail_ = &rcx_global.gl_obj_data_[first];
th->freelist_cache_ = 0;
th->dtor_list_ = &th->dtor_fake_[0];
th->dtor_fake_[0].next_ = &th->dtor_fake_[1];
th->dtor_fake_[0].prev_ = 0;
th->dtor_fake_[1].next_ = 0;
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0];
}
return 0;
}
void rcx_sys_deinit() {
// here we must free all survived rcx_t objects
// it's not implemented yet
free(rcx_global.threads_);
free(rcx_global.gl_obj_data_);
}
int rcx_sys_thread_init(unsigned thread_idx) {
rcx_thread.obj_data_ = rcx_global.threads_[thread_idx].obj_data_;
rcx_thread.rc_list_ = rcx_global.threads_[thread_idx].rc_list_;
rcx_thread.rc_list_pos_ = 0;
rcx_thread.activity_ = 0;
rcx_thread.index_ = thread_idx;
return 0;
}
void rcx_sys_thread_deinit() {}
int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state) {
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
// get free object descriptor from per thread queue
rcx_gl_obj_data_t* gl = th->freelist_tail_;
th->freelist_tail_ = th->freelist_tail_->next_;
x->idx_ = (unsigned)gl->rcx_;
x->thread_idx_ = rcx_thread.index_;
x->dtor_fp_ = dtor_fp;
x->state_ = state;
gl->rcx_ = x;
gl->rc_ = 1;
gl->next_ = 0;
gl->prev_ = 0;
return 0;
}
void rcx_destroy(rcx_t* rcx) {
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
rcx_gl_obj_data_t* gl = &rcx_global.gl_obj_data_[x->idx_];
gl->rcx_ = (rcx_int_t*)x->idx_;
gl->rc_ = x->thread_idx_;
// return object descriptor to local cache
gl->next_ = th->freelist_cache_;
th->freelist_cache_ = gl;
}
void rcx_acquire(rcx_t* rcx) {
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_local_t* th = &rcx_thread;
// find per thread object descriptor
rcx_obj_data_t* obj = &th->obj_data_[x->idx_];
// check whether object descriptor already in local list
// of descriptors for which thread have executed
// acquire/release operation
if (obj->rc_) {
// descriptor already in list
// just increment counter
obj->rc_ += 1;
} else {
// descriptor not in list
// increment and mark counter
obj->rc_ += 1 + rcx_in_list_flag;
// add descriptor to list
th->rc_list_[th->rc_list_pos_++] = x;
}
th->activity_ += rcx_activity_acq;
}
void rcx_release(rcx_t* rcx) {
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_local_t* th = &rcx_thread;
rcx_obj_data_t* obj = &th->obj_data_[x->idx_];
if (obj->rc_) {
obj->rc_ -= 1;
th->activity_ += rcx_activity_rel;
} else {
obj->rc_ += -1 + rcx_in_list_flag;
th->rc_list_[th->rc_list_pos_++] = x;
th->activity_ += rcx_activity_rel_new;
}
}
void* rcx_get_state(rcx_t* rcx) {
rcx_int_t* x = (rcx_int_t*)rcx;
return x->state_;
}
void rcx_process() {
rcx_thread_local_t* th = &rcx_thread;
if (th->activity_ >= rcx_activity_threshold &&
0 == rcx_global.epoch_pending_) {
// we've created enough activity
// initiate epoch shift
rcx_global.epoch_pending_ = 1;
}
if (0 == th->index_ && rcx_global.epoch_pending_ &&
rcx_global.epoch_order_ == rcx_global.thread_count_) {
// thread with index 0
// starts epoch processing
rcx_global.epoch_order_ = 0;
}
if (th->index_ == rcx_global.epoch_order_) {
// it's my turn - process epoch
rcx_process_epoch();
// notify next thread
rcx_global.epoch_order_ += 1;
if (rcx_global.epoch_order_ == rcx_global.thread_count_) {
rcx_global.epoch_pending_ = 0;
}
}
}
void rcx_collect() {
if (0 == rcx_global.epoch_pending_) rcx_global.epoch_pending_ = 1;
rcx_process();
}
void rcx_process_epoch() {
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
rcx_thread_t* th2;
rcx_gl_obj_data_t* cur;
rcx_gl_obj_data_t* next;
rcx_obj_data_t* loc;
unsigned i;
rcx_int_t* req;
// transfer cached object descriptors to owner thread
while (th->freelist_cache_) {
cur = th->freelist_cache_;
next = cur->next_;
th2 = &rcx_global.threads_[cur->rc_];
th2->freelist_head_->next_ = cur;
th2->freelist_head_ = cur;
th->freelist_cache_ = next;
}
// execute dtor function for objects
// which are located in dtor list
// for full epoch
cur = th->dtor_list_->next_;
if (cur->next_) {
while (cur->next_) {
next = cur->next_;
cur->rc_ = 1;
cur->next_ = 0;
cur->prev_ = 0;
// here object descriptor is ready for reuse
cur->rcx_->dtor_fp_(cur->rcx_->state_);
cur = next;
}
th->dtor_fake_[0].next_ = &th->dtor_fake_[1];
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0];
}
// transfer all acquire/release operations
// from local descriptor to global
for (i = 0; i != rcx_thread.rc_list_pos_; ++i) {
req = th->rc_list_[i];
// global descriptor
cur = &rcx_global.gl_obj_data_[req->idx_];
// local descriptor
loc = &th->obj_data_[req->idx_];
cur->rc_ += loc->rc_ - rcx_in_list_flag;
loc->rc_ = 0;
if (cur->rc_ && cur->next_) {
// remove object from dtor list
cur->prev_->next_ = cur->next_;
cur->next_->prev_ = cur->prev_;
cur->next_ = 0;
cur->prev_ = 0;
} else if (0 == cur->rc_ && 0 == cur->next_) {
// insert object to dtor list
cur->next_ = th->dtor_list_->next_;
cur->prev_ = th->dtor_list_;
th->dtor_list_->next_->prev_ = cur;
th->dtor_list_->next_ = cur;
} else if (0 == cur->rc_ && cur->next_) {
// remove and reinsert object to dtor list
cur->prev_->next_ = cur->next_;
cur->next_->prev_ = cur->prev_;
cur->next_ = th->dtor_list_->next_;
cur->prev_ = th->dtor_list_;
th->dtor_list_->next_->prev_ = cur;
th->dtor_list_->next_ = cur;
}
}
// reset local per-epoch variables
rcx_thread.activity_ = 0;
rcx_thread.rc_list_pos_ = 0;
// to support PDR
FULL_FENCE();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment