Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
simple read/write thread memory gc
#include "thread_id_pool.h"
#include <pthread.h>
#include <assert.h>
static pthread_key_t s_tkey;
static pthread_once_t s_tkey_once;
static void freeKey(void *p)
{
ThreadIdPool::TVal *val = (ThreadIdPool::TVal*) p;
val->self->destroyVal(val);
}
static void makeKey()
{
pthread_key_create(&s_tkey, freeKey);
}
ThreadIdPool::TVal *ThreadIdPool::createVal(size_t id)
{
TVal *val = new TVal();
val->self = this;
val->self->addRef();
val->id = id;
return val;
}
void ThreadIdPool::destroyVal(TVal *val)
{
assert(val->self == this);
val->self->put(val->id);
val->self->release();
delete val;
}
ThreadIdPool::ThreadIdPool()
{
for (int i = 0; i < TPG_TID_COUNT; ++i) {
_ids[i] = i + 1;
}
}
size_t ThreadIdPool::get()
{
pthread_once(&s_tkey_once, makeKey);
TVal *val = (TVal*) pthread_getspecific(s_tkey);
if (val != NULL) return val->id;
int i = _counter.incAndReturn() - 1;
if (i >= TPG_TID_COUNT) return 0;
size_t id = _ids[i];
pthread_setspecific(s_tkey, createVal(id));
return id;
}
void ThreadIdPool::put(size_t id)
{
pthread_setspecific(s_tkey, NULL);
int i = _counter.decAndReturn();
assert(i >= 0);
_ids[i] = id;
}
#ifndef __THREAD_ID_POOL_H_
#define __THREAD_ID_POOL_H_
#include <autil/AtomicCounter.h>
#include <cm_basic/common/ref_obj.h>
#define TPG_TID_COUNT 2048
// internal use only
class ThreadIdPool : public cm_basic::RefObj
{
public:
struct TVal {
ThreadIdPool *self;
size_t id;
};
TVal *createVal(size_t id);
void destroyVal(TVal *val);
public:
ThreadIdPool();
size_t get();
void put(size_t id);
private:
autil::AtomicCounter _counter;
size_t _ids[TPG_TID_COUNT];
};
#endif
#include "thread_mark_gc.h"
#include "thread_id_pool.h"
#include <string.h>
#define PENDING_POS(n) ((n - 1) % TPG_MAX_PENDING)
///////////////////////////////////////////////////////////////////////////////
ThreadMarkGC::ThreadMarkGC()
{
_idPool = new ThreadIdPool();
_idPool->addRef();
_head = _tail = 0;
memset(_pendings, 0, sizeof(_pendings));
memset(_tmark, 0, sizeof(_tmark));
memset(_tfreeds, 0, sizeof(_tfreeds));
}
ThreadMarkGC::~ThreadMarkGC()
{
_idPool->release();
}
int ThreadMarkGC::release(void **out)
{
return gc(out, _head);
}
int ThreadMarkGC::gc(void **out)
{
uint64_t pos = findFreePos();
return gc(out, pos);
}
int ThreadMarkGC::gc(void **out, uint64_t pos)
{
int count = 0;
while (_tail != pos) { // != to avoid overflow
out[count++] = _pendings[PENDING_POS(_tail)];
_tail ++;
}
return count;
}
bool ThreadMarkGC::update()
{
size_t tid = _idPool->get();
if (tid == 0 || tid > TPG_MAX_THREAD) return false;
// i'm ok now, post the signal to the update thread
_tmark[tid - 1] = _head;
return true;
}
bool ThreadMarkGC::deferFree(void *p)
{
if (_head - _tail >= TPG_MAX_PENDING) {
// fatal error, memory leak
return false;
}
uint32_t n = PENDING_POS(_head);
_head ++;
_pendings[n] = p;
return true;
}
uint64_t ThreadMarkGC::findFreePos()
{
int tcnt = TPG_MAX_THREAD;
// if all threads are inactive, return _head
uint64_t min = (uint64_t)-1, pos = _head;
bool found = false;
for (int i = 0; i < tcnt; ++i) {
uint64_t tpos = _tmark[i];
if (tpos == _tfreeds[i]) continue;
// compare offset to avoid uint overflow
uint64_t offset = tpos - _tail;
if (offset < min) {
min = offset;
pos = tpos;
found = true;
}
}
if (found) {
// update all threads' last freed position
for (int i = 0; i < tcnt; ++i) {
// only update these active threads
if (_tmark[i] != _tfreeds[i])
_tfreeds[i] = pos;
}
}
return pos;
}
#ifndef __THREAD_MARK_GC_H_
#define __THREAD_MARK_GC_H_
#include <stdint.h>
#define TPG_MAX_THREAD 2048
#define TPG_MAX_PENDING 8
class ThreadIdPool;
class ThreadMarkGC
{
public:
ThreadMarkGC();
~ThreadMarkGC();
int release(void **out);
// update thread calls, collect these safe to free memories
int gc(void **out);
// reader thread calls, to notify leaving the critical section
bool update();
// update thread calls, to defer free a memory
bool deferFree(void *p);
private:
int gc(void **out, uint64_t pos);
uint64_t findFreePos();
private:
ThreadIdPool *_idPool;
void *_pendings[TPG_MAX_PENDING];
uint64_t _head, _tail;
uint64_t _tmark[TPG_MAX_THREAD];
uint64_t _tfreeds[TPG_MAX_THREAD];
};
#define TPG_DO_GC(type, tpg) { \
void *ps[TPG_MAX_PENDING]; \
int cnt = (tpg).gc((void**)&ps); \
while (cnt > 0) delete (type*)ps[--cnt]; \
}
#define TPG_DO_RELEASE(type, tpg) { \
void *ps[TPG_MAX_PENDING]; \
int cnt = (tpg).release((void**)&ps); \
while (cnt > 0) delete (type*)ps[--cnt]; \
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment