Skip to content

Instantly share code, notes, and snippets.

@SF-Zhou
Last active March 24, 2023 03:37
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SF-Zhou/51c9e36e74c41d20abe94549d1ffe17f to your computer and use it in GitHub Desktop.
Save SF-Zhou/51c9e36e74c41d20abe94549d1ffe17f to your computer and use it in GitHub Desktop.
Lock Free Double Buffer
#include <atomic>
#include <cassert>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
template <class T>
struct HazardPointer {
private:
std::atomic<T *> target_;
HazardPointer<T> *next_;
std::atomic_flag active_ = ATOMIC_FLAG_INIT;
public:
class Guard {
public:
explicit Guard(HazardPointer<T> *pointer) : pointer_(pointer) {}
Guard(const HazardPointer &) = delete;
~Guard() { pointer_->Release(); }
operator bool() {
return pointer_->target_.load(std::memory_order_acquire);
}
T *operator->() {
return pointer_->target_.load(std::memory_order_acquire);
}
private:
HazardPointer<T> *pointer_;
};
static Guard Acquire(const std::atomic<T *> &target) {
auto pointer = HazardPointer<T>::Alloc();
do {
pointer->target_ = target.load(std::memory_order_acquire);
} while (pointer->target_ != target);
return Guard(pointer);
}
void Release() {
target_.store(nullptr, std::memory_order_release);
active_.clear(std::memory_order_release);
}
static T *Update(std::atomic<T *> &target, T *new_target) {
auto old = target.exchange(new_target);
// make sure old pointer not in use
while (true) {
bool in_use = false;
auto head = Head().load(std::memory_order_acquire);
while (head != nullptr) {
if (old == head->target_) {
in_use = true;
break;
}
head = head->next_;
}
if (!in_use) {
return old;
}
}
return nullptr;
}
private:
static std::atomic<HazardPointer<T> *> &Head() {
static std::atomic<HazardPointer<T> *> head;
return head;
}
static HazardPointer<T> *Alloc() {
auto p = Head().load(std::memory_order_acquire);
while (p != nullptr) {
if (!p->active_.test_and_set()) {
return p;
}
p = p->next_;
}
p = new HazardPointer<T>();
p->active_.test_and_set();
decltype(p) old_head;
do {
old_head = Head().load(std::memory_order_acquire);
p->next_ = old_head;
} while (!Head().compare_exchange_weak(old_head, p));
return p;
}
};
template <typename T>
class DoubleBuffer {
public:
using HP = HazardPointer<T>;
typename HP::Guard Read() { return HP::Acquire(read_); }
template <typename... Args>
void Modify(Args &&... args) {
write_->Update(args...);
write_ = HP::Update(read_, write_);
write_->Update(args...);
}
private:
std::atomic<T *> read_{new T};
T *write_{new T};
};
struct Data {
void Update(size_t loop_count) {
is_updating = true;
for (size_t i = 0; i < loop_count; ++i) {
std::unique_ptr<int> dummy(new int(1));
}
is_updating = false;
}
bool is_updating = false;
};
int main() {
constexpr auto kThreads = 4;
constexpr auto kInnerCount = 100;
constexpr auto kOuterCount = 100000;
DoubleBuffer<Data> double_buffer;
std::atomic<bool> start{false};
std::thread writer([&] {
while (!start.load()) {
}
for (size_t i = 0; i < kOuterCount * kThreads; ++i) {
double_buffer.Modify(kInnerCount);
}
});
std::vector<std::thread> readers;
for (int i = 0; i < kThreads; ++i) {
readers.emplace_back([&] {
while (!start.load()) {
}
for (size_t i = 0; i < kOuterCount * kInnerCount; ++i) {
auto data = double_buffer.Read();
assert(data);
assert(data->is_updating == false);
}
});
}
start.store(true);
for (auto &reader : readers) {
reader.join();
}
writer.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment