Skip to content

Instantly share code, notes, and snippets.

@markpapadakis
Created August 21, 2018 14:20
Show Gist options
  • Save markpapadakis/16050d0db99733eec8734a298061ef17 to your computer and use it in GitHub Desktop.
Save markpapadakis/16050d0db99733eec8734a298061ef17 to your computer and use it in GitHub Desktop.
#include <atomic>
#include <thread>
struct Thread {
struct thread_id final {
uint32_t id_;
thread_id() {
static std::atomic<uint32_t> next{0};
id_ = next.fetch_add(1);
}
inline auto id() const noexcept {
return id_;
}
};
static thread_local inline thread_id tid_{};
static inline auto id() {
return tid_.id();
}
static void set_id(const uint32_t v) {
tid_.id_ = v;
}
};
class LightEpoch {
using context_t = void *;
typedef void (*callback_t)(context_t);
private:
struct alignas(64) entry final {
uint64_t local_current_epoch_{0};
uint32_t reentrant_{0};
std::atomic<unsigned> phase_finished_{0};
};
struct epoch_action final {
static constexpr uint64_t K_free = std::numeric_limits<uint64_t>::max();
static constexpr uint64_t K_locked = std::numeric_limits<uint64_t>::max() - 1;
void reset() {
epoch_ = K_free;
callback_ = nullptr;
context_ = nullptr;
}
epoch_action() {
reset();
}
bool is_free() const noexcept {
return epoch_.load() == K_free;
}
bool try_pop(uint64_t expected_epoch) {
const auto r = epoch_.compare_exchange_strong(expected_epoch, K_locked);
if (r) {
auto callback = std::exchange(callback_, nullptr);
auto ctx = std::exchange(context_, nullptr);
// release the lock
epoch_.store(K_free);
// perform the action
(*callback)(ctx);
}
return r;
}
bool try_push(uint64_t prior_epoch, callback_t new_callback, context_t new_context) {
auto expected_epoch = K_free;
const auto r = epoch_.compare_exchange_strong(expected_epoch, K_locked);
if (r) {
callback_ = new_callback;
context_ = new_context;
// Release the lock
epoch_.store(prior_epoch);
}
return r;
}
bool try_swap(uint64_t expected_epoch, const uint64_t prior_epoch, callback_t new_callback, context_t new_context) {
const auto r = epoch_.compare_exchange_strong(expected_epoch, K_locked);
if (r) {
auto existing_callback = std::exchange(callback_, new_callback);
auto existing_context = std::exchange(context_, new_context);
// release the lock
epoch_.store(prior_epoch);
// perform the action
(*existing_callback)(existing_context);
}
return r;
}
// Always read it first, and update it laste
std::atomic<uint64_t> epoch_;
callback_t callback_;
context_t context_;
};
static constexpr size_t K_table_size = 96;
static constexpr size_t K_drain_list_size = 256;
// this thread is not protecting any epoch
static constexpr uint64_t K_unprotected = 0;
// epoch table
entry *table_;
// number of entries in epoch table
size_t num_entries_;
// list of actions, epoch pairs containing actions to be performed when an epoch becomes safe to reclaim
epoch_action drain_list_[K_drain_list_size];
// pending drain actions
std::atomic<uint32_t> drain_count_;
// current system epoch(global state)
std::atomic<uint64_t> current_epoch_;
// cached value of epoch that is safe to reclaim
std::atomic<uint64_t> safe_to_relcaim_epoch_;
private:
void init(const size_t size) {
num_entries_ = size;
// cache alignment
table_ = reinterpret_cast<entry *>(aligned_alloc(64, (size + 2) * sizeof(entry)));
new (table_) entry[size + 1];
current_epoch_ = 1;
safe_to_relcaim_epoch_ = 0;
drain_count_ = 0;
for (size_t i{0}; i < K_drain_list_size; ++i)
drain_list_[i].reset();
}
uint64_t compute_new_safe_to_reclaim_epoch(const uint64_t current_epoch) {
auto oldest_ongoing_call = current_epoch;
for (size_t i{1}; i <= num_entries_; ++i) {
if (const auto entry_epoch = table_[i].local_current_epoch_; entry_epoch != K_unprotected && entry_epoch < oldest_ongoing_call)
oldest_ongoing_call = entry_epoch;
}
return safe_to_relcaim_epoch_ = oldest_ongoing_call - 1;
}
public:
LightEpoch(const size_t size = K_table_size)
: table_{nullptr}, num_entries_{0}, drain_count_{0}, drain_list_{} {
init(size);
}
~LightEpoch() {
free(table_);
}
// enter the thread into the protected code region
// protection is factored in in compute_new_safe_to_reclaim_epoch()
//
// whereas we determine the oldest safest to unclaim epoch among all
// threads that are protected (i.e "busy")
//
// The idea is that when you begin a new session/transaction
// you protect() and when you are done, you unprotect()
// Between those two calls, if you make a change you bump_current_epoch()
// which sigals that this current epoch was spent doing something and need to advance to the next
// (and you get to also provide a callback for when it's safe to execute it)
inline auto protect() {
const auto tid = Thread::id();
return table_[tid].local_current_epoch_ = current_epoch_.load();
}
// enter the thread into the protected code region; proecess entries in the drain list if possible
auto protect_and_drain() {
const auto tid = Thread::id();
const auto res = (table_[tid].local_current_epoch_ = current_epoch_.load());
if (drain_count_.load()) {
drain(res);
}
return res;
}
auto reentrant_protect() {
const auto tid = Thread::id();
if (const auto res = table_[tid].local_current_epoch_; res != K_unprotected)
return res;
const auto res = (table_[tid].local_current_epoch_ = current_epoch_.load());
table_[tid].reentrant_++;
return res;
}
bool is_protected() const noexcept {
return table_[Thread::id()].local_current_epoch_ == K_unprotected;
}
// exit the thread from the protected code region
void unprotect() {
table_[Thread::id()].local_current_epoch_ = K_unprotected;
}
void reentrant_unprotect() {
const auto tid = Thread::id();
if (0 == --table_[tid].reentrant_) {
table_[tid].local_current_epoch_ = K_unprotected;
}
}
void drain(const uint64_t next_epoch) {
compute_new_safe_to_reclaim_epoch(next_epoch);
for (size_t i{0}; i < K_drain_list_size; ++i) {
const auto trigger_epoch = drain_list_[i].epoch_.load();
if (trigger_epoch <= safe_to_relcaim_epoch_) {
if (drain_list_[i].try_pop(trigger_epoch)) {
if (0 == --drain_count_) {
break;
}
}
}
}
}
// increment the current epoch(global system state)
auto bump_current_epoch() {
const auto next_epoch = ++current_epoch_;
if (drain_count_) {
drain(next_epoch);
}
return next_epoch;
}
// increment the current epoch(global system state) and register
// a trigger action for when the older epoch becomes safe to relcaim
auto bump_current_epoch(callback_t callback, context_t context) {
const auto prior_epoch = bump_current_epoch() - 1;
uint32_t i{0}, j{0};
for (;;) {
const auto trigger_epoch = drain_list_[i].epoch_.load();
if (trigger_epoch == epoch_action::K_free) {
if (drain_list_[i].try_push(prior_epoch, callback, context)) {
++drain_count_;
break;
}
} else if (trigger_epoch <= safe_to_relcaim_epoch_.load()) {
if (drain_list_[i].try_swap(trigger_epoch, prior_epoch, callback, context)) {
++drain_count_;
break;
}
}
if (++i == K_drain_list_size) {
i = 0;
if (++j == 512) {
j = 0;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
}
bool safe_to_reclaim(const uint64_t epoch) const noexcept {
return epoch <= safe_to_relcaim_epoch_;
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment