Created
August 21, 2018 14:20
-
-
Save markpapadakis/16050d0db99733eec8734a298061ef17 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <atomic> | |
#include <thread> | |
struct Thread { | |
struct thread_id final { | |
uint32_t id_; | |
thread_id() { | |
static std::atomic<uint32_t> next{0}; | |
id_ = next.fetch_add(1); | |
} | |
inline auto id() const noexcept { | |
return id_; | |
} | |
}; | |
static thread_local inline thread_id tid_{}; | |
static inline auto id() { | |
return tid_.id(); | |
} | |
static void set_id(const uint32_t v) { | |
tid_.id_ = v; | |
} | |
}; | |
class LightEpoch { | |
using context_t = void *; | |
typedef void (*callback_t)(context_t); | |
private: | |
struct alignas(64) entry final { | |
uint64_t local_current_epoch_{0}; | |
uint32_t reentrant_{0}; | |
std::atomic<unsigned> phase_finished_{0}; | |
}; | |
struct epoch_action final { | |
static constexpr uint64_t K_free = std::numeric_limits<uint64_t>::max(); | |
static constexpr uint64_t K_locked = std::numeric_limits<uint64_t>::max() - 1; | |
void reset() { | |
epoch_ = K_free; | |
callback_ = nullptr; | |
context_ = nullptr; | |
} | |
epoch_action() { | |
reset(); | |
} | |
bool is_free() const noexcept { | |
return epoch_.load() == K_free; | |
} | |
bool try_pop(uint64_t expected_epoch) { | |
const auto r = epoch_.compare_exchange_strong(expected_epoch, K_locked); | |
if (r) { | |
auto callback = std::exchange(callback_, nullptr); | |
auto ctx = std::exchange(context_, nullptr); | |
// release the lock | |
epoch_.store(K_free); | |
// perform the action | |
(*callback)(ctx); | |
} | |
return r; | |
} | |
bool try_push(uint64_t prior_epoch, callback_t new_callback, context_t new_context) { | |
auto expected_epoch = K_free; | |
const auto r = epoch_.compare_exchange_strong(expected_epoch, K_locked); | |
if (r) { | |
callback_ = new_callback; | |
context_ = new_context; | |
// Release the lock | |
epoch_.store(prior_epoch); | |
} | |
return r; | |
} | |
bool try_swap(uint64_t expected_epoch, const uint64_t prior_epoch, callback_t new_callback, context_t new_context) { | |
const auto r = epoch_.compare_exchange_strong(expected_epoch, K_locked); | |
if (r) { | |
auto existing_callback = std::exchange(callback_, new_callback); | |
auto existing_context = std::exchange(context_, new_context); | |
// release the lock | |
epoch_.store(prior_epoch); | |
// perform the action | |
(*existing_callback)(existing_context); | |
} | |
return r; | |
} | |
// Always read it first, and update it laste | |
std::atomic<uint64_t> epoch_; | |
callback_t callback_; | |
context_t context_; | |
}; | |
static constexpr size_t K_table_size = 96; | |
static constexpr size_t K_drain_list_size = 256; | |
// this thread is not protecting any epoch | |
static constexpr uint64_t K_unprotected = 0; | |
// epoch table | |
entry *table_; | |
// number of entries in epoch table | |
size_t num_entries_; | |
// list of actions, epoch pairs containing actions to be performed when an epoch becomes safe to reclaim | |
epoch_action drain_list_[K_drain_list_size]; | |
// pending drain actions | |
std::atomic<uint32_t> drain_count_; | |
// current system epoch(global state) | |
std::atomic<uint64_t> current_epoch_; | |
// cached value of epoch that is safe to reclaim | |
std::atomic<uint64_t> safe_to_relcaim_epoch_; | |
private: | |
void init(const size_t size) { | |
num_entries_ = size; | |
// cache alignment | |
table_ = reinterpret_cast<entry *>(aligned_alloc(64, (size + 2) * sizeof(entry))); | |
new (table_) entry[size + 1]; | |
current_epoch_ = 1; | |
safe_to_relcaim_epoch_ = 0; | |
drain_count_ = 0; | |
for (size_t i{0}; i < K_drain_list_size; ++i) | |
drain_list_[i].reset(); | |
} | |
uint64_t compute_new_safe_to_reclaim_epoch(const uint64_t current_epoch) { | |
auto oldest_ongoing_call = current_epoch; | |
for (size_t i{1}; i <= num_entries_; ++i) { | |
if (const auto entry_epoch = table_[i].local_current_epoch_; entry_epoch != K_unprotected && entry_epoch < oldest_ongoing_call) | |
oldest_ongoing_call = entry_epoch; | |
} | |
return safe_to_relcaim_epoch_ = oldest_ongoing_call - 1; | |
} | |
public: | |
LightEpoch(const size_t size = K_table_size) | |
: table_{nullptr}, num_entries_{0}, drain_count_{0}, drain_list_{} { | |
init(size); | |
} | |
~LightEpoch() { | |
free(table_); | |
} | |
// enter the thread into the protected code region | |
// protection is factored in in compute_new_safe_to_reclaim_epoch() | |
// | |
// whereas we determine the oldest safest to unclaim epoch among all | |
// threads that are protected (i.e "busy") | |
// | |
// The idea is that when you begin a new session/transaction | |
// you protect() and when you are done, you unprotect() | |
// Between those two calls, if you make a change you bump_current_epoch() | |
// which sigals that this current epoch was spent doing something and need to advance to the next | |
// (and you get to also provide a callback for when it's safe to execute it) | |
inline auto protect() { | |
const auto tid = Thread::id(); | |
return table_[tid].local_current_epoch_ = current_epoch_.load(); | |
} | |
// enter the thread into the protected code region; proecess entries in the drain list if possible | |
auto protect_and_drain() { | |
const auto tid = Thread::id(); | |
const auto res = (table_[tid].local_current_epoch_ = current_epoch_.load()); | |
if (drain_count_.load()) { | |
drain(res); | |
} | |
return res; | |
} | |
auto reentrant_protect() { | |
const auto tid = Thread::id(); | |
if (const auto res = table_[tid].local_current_epoch_; res != K_unprotected) | |
return res; | |
const auto res = (table_[tid].local_current_epoch_ = current_epoch_.load()); | |
table_[tid].reentrant_++; | |
return res; | |
} | |
bool is_protected() const noexcept { | |
return table_[Thread::id()].local_current_epoch_ == K_unprotected; | |
} | |
// exit the thread from the protected code region | |
void unprotect() { | |
table_[Thread::id()].local_current_epoch_ = K_unprotected; | |
} | |
void reentrant_unprotect() { | |
const auto tid = Thread::id(); | |
if (0 == --table_[tid].reentrant_) { | |
table_[tid].local_current_epoch_ = K_unprotected; | |
} | |
} | |
void drain(const uint64_t next_epoch) { | |
compute_new_safe_to_reclaim_epoch(next_epoch); | |
for (size_t i{0}; i < K_drain_list_size; ++i) { | |
const auto trigger_epoch = drain_list_[i].epoch_.load(); | |
if (trigger_epoch <= safe_to_relcaim_epoch_) { | |
if (drain_list_[i].try_pop(trigger_epoch)) { | |
if (0 == --drain_count_) { | |
break; | |
} | |
} | |
} | |
} | |
} | |
// increment the current epoch(global system state) | |
auto bump_current_epoch() { | |
const auto next_epoch = ++current_epoch_; | |
if (drain_count_) { | |
drain(next_epoch); | |
} | |
return next_epoch; | |
} | |
// increment the current epoch(global system state) and register | |
// a trigger action for when the older epoch becomes safe to relcaim | |
auto bump_current_epoch(callback_t callback, context_t context) { | |
const auto prior_epoch = bump_current_epoch() - 1; | |
uint32_t i{0}, j{0}; | |
for (;;) { | |
const auto trigger_epoch = drain_list_[i].epoch_.load(); | |
if (trigger_epoch == epoch_action::K_free) { | |
if (drain_list_[i].try_push(prior_epoch, callback, context)) { | |
++drain_count_; | |
break; | |
} | |
} else if (trigger_epoch <= safe_to_relcaim_epoch_.load()) { | |
if (drain_list_[i].try_swap(trigger_epoch, prior_epoch, callback, context)) { | |
++drain_count_; | |
break; | |
} | |
} | |
if (++i == K_drain_list_size) { | |
i = 0; | |
if (++j == 512) { | |
j = 0; | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
} | |
} | |
} | |
} | |
bool safe_to_reclaim(const uint64_t epoch) const noexcept { | |
return epoch <= safe_to_relcaim_epoch_; | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment