simple read/write thread memory gc
#include "thread_id_pool.h" | |
#include <pthread.h> | |
#include <assert.h> | |
static pthread_key_t s_tkey; | |
static pthread_once_t s_tkey_once; | |
static void freeKey(void *p) | |
{ | |
ThreadIdPool::TVal *val = (ThreadIdPool::TVal*) p; | |
val->self->destroyVal(val); | |
} | |
static void makeKey() | |
{ | |
pthread_key_create(&s_tkey, freeKey); | |
} | |
ThreadIdPool::TVal *ThreadIdPool::createVal(size_t id) | |
{ | |
TVal *val = new TVal(); | |
val->self = this; | |
val->self->addRef(); | |
val->id = id; | |
return val; | |
} | |
void ThreadIdPool::destroyVal(TVal *val) | |
{ | |
assert(val->self == this); | |
val->self->put(val->id); | |
val->self->release(); | |
delete val; | |
} | |
ThreadIdPool::ThreadIdPool() | |
{ | |
for (int i = 0; i < TPG_TID_COUNT; ++i) { | |
_ids[i] = i + 1; | |
} | |
} | |
size_t ThreadIdPool::get() | |
{ | |
pthread_once(&s_tkey_once, makeKey); | |
TVal *val = (TVal*) pthread_getspecific(s_tkey); | |
if (val != NULL) return val->id; | |
int i = _counter.incAndReturn() - 1; | |
if (i >= TPG_TID_COUNT) return 0; | |
size_t id = _ids[i]; | |
pthread_setspecific(s_tkey, createVal(id)); | |
return id; | |
} | |
void ThreadIdPool::put(size_t id) | |
{ | |
pthread_setspecific(s_tkey, NULL); | |
int i = _counter.decAndReturn(); | |
assert(i >= 0); | |
_ids[i] = id; | |
} | |
#ifndef __THREAD_ID_POOL_H_ | |
#define __THREAD_ID_POOL_H_ | |
#include <autil/AtomicCounter.h> | |
#include <cm_basic/common/ref_obj.h> | |
#define TPG_TID_COUNT 2048 | |
// internal use only | |
class ThreadIdPool : public cm_basic::RefObj | |
{ | |
public: | |
struct TVal { | |
ThreadIdPool *self; | |
size_t id; | |
}; | |
TVal *createVal(size_t id); | |
void destroyVal(TVal *val); | |
public: | |
ThreadIdPool(); | |
size_t get(); | |
void put(size_t id); | |
private: | |
autil::AtomicCounter _counter; | |
size_t _ids[TPG_TID_COUNT]; | |
}; | |
#endif | |
#include "thread_mark_gc.h" | |
#include "thread_id_pool.h" | |
#include <string.h> | |
#define PENDING_POS(n) ((n - 1) % TPG_MAX_PENDING) | |
/////////////////////////////////////////////////////////////////////////////// | |
ThreadMarkGC::ThreadMarkGC() | |
{ | |
_idPool = new ThreadIdPool(); | |
_idPool->addRef(); | |
_head = _tail = 0; | |
memset(_pendings, 0, sizeof(_pendings)); | |
memset(_tmark, 0, sizeof(_tmark)); | |
memset(_tfreeds, 0, sizeof(_tfreeds)); | |
} | |
ThreadMarkGC::~ThreadMarkGC() | |
{ | |
_idPool->release(); | |
} | |
int ThreadMarkGC::release(void **out) | |
{ | |
return gc(out, _head); | |
} | |
int ThreadMarkGC::gc(void **out) | |
{ | |
uint64_t pos = findFreePos(); | |
return gc(out, pos); | |
} | |
int ThreadMarkGC::gc(void **out, uint64_t pos) | |
{ | |
int count = 0; | |
while (_tail != pos) { // != to avoid overflow | |
out[count++] = _pendings[PENDING_POS(_tail)]; | |
_tail ++; | |
} | |
return count; | |
} | |
bool ThreadMarkGC::update() | |
{ | |
size_t tid = _idPool->get(); | |
if (tid == 0 || tid > TPG_MAX_THREAD) return false; | |
// i'm ok now, post the signal to the update thread | |
_tmark[tid - 1] = _head; | |
return true; | |
} | |
bool ThreadMarkGC::deferFree(void *p) | |
{ | |
if (_head - _tail >= TPG_MAX_PENDING) { | |
// fatal error, memory leak | |
return false; | |
} | |
uint32_t n = PENDING_POS(_head); | |
_head ++; | |
_pendings[n] = p; | |
return true; | |
} | |
uint64_t ThreadMarkGC::findFreePos() | |
{ | |
int tcnt = TPG_MAX_THREAD; | |
// if all threads are inactive, return _head | |
uint64_t min = (uint64_t)-1, pos = _head; | |
bool found = false; | |
for (int i = 0; i < tcnt; ++i) { | |
uint64_t tpos = _tmark[i]; | |
if (tpos == _tfreeds[i]) continue; | |
// compare offset to avoid uint overflow | |
uint64_t offset = tpos - _tail; | |
if (offset < min) { | |
min = offset; | |
pos = tpos; | |
found = true; | |
} | |
} | |
if (found) { | |
// update all threads' last freed position | |
for (int i = 0; i < tcnt; ++i) { | |
// only update these active threads | |
if (_tmark[i] != _tfreeds[i]) | |
_tfreeds[i] = pos; | |
} | |
} | |
return pos; | |
} | |
#ifndef __THREAD_MARK_GC_H_ | |
#define __THREAD_MARK_GC_H_ | |
#include <stdint.h> | |
#define TPG_MAX_THREAD 2048 | |
#define TPG_MAX_PENDING 8 | |
class ThreadIdPool; | |
class ThreadMarkGC | |
{ | |
public: | |
ThreadMarkGC(); | |
~ThreadMarkGC(); | |
int release(void **out); | |
// update thread calls, collect these safe to free memories | |
int gc(void **out); | |
// reader thread calls, to notify leaving the critical section | |
bool update(); | |
// update thread calls, to defer free a memory | |
bool deferFree(void *p); | |
private: | |
int gc(void **out, uint64_t pos); | |
uint64_t findFreePos(); | |
private: | |
ThreadIdPool *_idPool; | |
void *_pendings[TPG_MAX_PENDING]; | |
uint64_t _head, _tail; | |
uint64_t _tmark[TPG_MAX_THREAD]; | |
uint64_t _tfreeds[TPG_MAX_THREAD]; | |
}; | |
#define TPG_DO_GC(type, tpg) { \ | |
void *ps[TPG_MAX_PENDING]; \ | |
int cnt = (tpg).gc((void**)&ps); \ | |
while (cnt > 0) delete (type*)ps[--cnt]; \ | |
} | |
#define TPG_DO_RELEASE(type, tpg) { \ | |
void *ps[TPG_MAX_PENDING]; \ | |
int cnt = (tpg).release((void**)&ps); \ | |
while (cnt > 0) delete (type*)ps[--cnt]; \ | |
} | |
#endif | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment