Skip to content

Instantly share code, notes, and snippets.

@fwoodruff
Created December 9, 2020 11:16
Show Gist options
  • Save fwoodruff/09a20af221b1f07b5e705e2189a921f8 to your computer and use it in GitHub Desktop.
Save fwoodruff/09a20af221b1f07b5e705e2189a921f8 to your computer and use it in GitHub Desktop.
Lock-free stack but you can pop data before it arrives
// Copyright (c) 2020, Freddie Woodruff
// All rights reserved
import <optional>
import <variant>
import <coroutine>
import <exception>
template<typename T>
export class transfer_stack {
public:
class awaitable {
std::optional<T> value;
std::coroutine_handle handle;
bool await_suspend(std::coroutine_handle h) {
handle = h;
old_head = head.load();
for(;;) {
if(old_head.type == tag::value) {
auto val = try_pop(old_head);
if(val.h
as_value()) {
value = val;
return true;
}
} else {
if(try_push({std::move(*this)}, old_head)) {
return false;
}
}
}
}
bool await_ready() { return false; }
T await_resume() {
if(!value.has_value()) {
throw std::logic_error(
"awaiting destroyed stack\n");
}
return value;
}
~awaitable() {
if(handle) {
handle.resume();
}
}
awaitable(const awaitable&) = delete;
awaitable& operator=(const awaitable&) = delete;
awaitable(awaitable&& other) noexcept
: handle(exchange(other.handle, {nullptr})
: value {exchange(other.value , std:nullopt)) {}
awaitable& operator=(awaitable&& other) noexcept {
handle = exchange(other.handle, {nullptr});
value = exchange(other.value , std::nullopt);
}
};
private:
using value_hole = std::variant<awaitable, value>;
struct node {
value_hole vh;
atomic_uint exit_count;
node_ptr next;
};
enum class tag : unsigned char {
value,
hole
};
struct node_ptr {
node* ptr;
unsigned entry_count;
tag type;
};
std::atomic<node_ptr> head;
// See Chapter 7
// Anthony Williams - C++ Concurrency In Action
std::optional<value_hole> try_pop(node_ptr& old_head) {
type = old_head.type;
for(;;) {
while(old_head.type == type &&
!head.compare_exchange_weak(
old_head,
{ old_head.ptr, old_head.entry_count+1, type });
if(type != old_head.type) {
return std::nullopt;
}
++old_head.entry_count;
if(head.compare_exchange_strong(old_head,
old_head.ptr->next) {
const int increase = old_head.entry_count - 2;
if((old_head.ptr->exit_count += increase) == 0) {
delete old_head.ptr;
}
return { std::move(old_head.ptr->vh) };
} else if (!--old_head.ptr->exit_count) {
delete old_head.ptr;
}
}
}
bool try_push(value_hole vh, node_ptr& old_head) {
assert(holds_alternative<T>(vh)
== (old_head.type == tag::value);
const auto type = old_head.type;
node_ptr new_node;
new_node.ptr = new node(vh, old_head);
new_node.entry_count = 1;
while(type == new_node.ptr->next.type &&
!head.compare_exchange_weak(
new_node.ptr->next,new_node));
old_head = new_node.ptr->next;
if(type != old_head.type) {
delete new_node.ptr;
}
return type == old_head.type;
}
public:
awaitable pop() {
return awaitable{};
}
void push(T value) {
auto old_head = head.load();
for(;;) {
if(old_head.type == tag::value) {
if(try_push(value, old_head)) {
return;
}
} else {
auto opt_waiter = try_pop(old_head);
if(opt_waiter.has_value()) {
std::get<awaitable>(*opt_waiter).value
= { std::move(value) };
return;
}
}
}
}
~stack() {
auto old_head = head.load();
while(try_pop(old_head) != std::nullopt);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment