Skip to content

Instantly share code, notes, and snippets.

@inspirit
Created June 20, 2015 22:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save inspirit/f044cabb9c5000a554fc to your computer and use it in GitHub Desktop.
Save inspirit/f044cabb9c5000a554fc to your computer and use it in GitHub Desktop.
Lock-free Stack using Double width CAS and Packed/Tagged pointer (based on Go)
#pragma once
#include <atomic>
#include <type_traits>
template<typename T, size_t Capacity>
struct lfstack {
struct node_t final {
T value;
node_t *next;
};
struct head_t final {
uintptr_t aba;
node_t *node;
head_t() noexcept :aba(0),node(nullptr)
{}
head_t(node_t* ptr) noexcept :aba(0),node(ptr)
{}
};
static_assert(sizeof(head_t)== 2*sizeof(uintptr_t), "Stack head should be 2 pointers size.");
alignas(128) std::atomic<head_t> head;
alignas(128) std::atomic<head_t> free_nodes;
using node_type = typename std::aligned_storage<sizeof(node_t), 128>::type;
node_type node_buffer[Capacity];
lfstack()
{
head.store( head_t(), std::memory_order_relaxed );
for(size_t i = 0; i < Capacity - 1; ++i)
{
reinterpret_cast<node_t*>(&node_buffer[i])->next = reinterpret_cast<node_t*>(&node_buffer[i + 1]);
}
reinterpret_cast<node_t*>(&node_buffer[Capacity-1])->next = nullptr;
free_nodes.store(head_t(reinterpret_cast<node_t*>(&node_buffer[0])), std::memory_order_relaxed);
// Issue memory barrier so we can immediately start work
std::atomic_thread_fence(std::memory_order_release);
}
template<class U>
bool push(U && data)
{
node_t *node = _pop(free_nodes);
if (node == nullptr)
return false;
node->value = std::forward<U>(data);
_push(head, node);
return true;
}
bool pop(T& data)
{
node_t *node = _pop(head);
if (node == nullptr)
return false;
data = std::move(node->value);
_push(free_nodes, node);
return true;
}
node_t* _pop(std::atomic<head_t>& h)
{
head_t next, orig = h.load(std::memory_order_relaxed);
do {
if (orig.node == nullptr)
return nullptr;
next.aba = orig.aba + 1;
next.node = orig.node->next;
} while (!h.compare_exchange_weak(orig, next,
std::memory_order_acq_rel,
std::memory_order_acquire));
return orig.node;
}
void _push(std::atomic<head_t>& h, node_t* node)
{
head_t next, orig = h.load(std::memory_order_relaxed);
do {
node->next = orig.node;
next.aba = orig.aba + 1;
next.node = node;
} while (!h.compare_exchange_weak(orig, next,
std::memory_order_acq_rel,
std::memory_order_acquire));
}
};
#pragma once
#include <atomic>
#include <type_traits>
namespace detail {
template<typename T, size_t PSize> struct pointer_pack {};
template<typename T> struct pointer_pack<T,8>
{
// 64Bit Machine
// On AMD64, virtual addresses are 48-bit numbers sign extended to 64.
// We shift the address left 16 to eliminate the sign extended part and make
// room in the bottom for the count.
// In addition to the 16 bits taken from the top, we can take 3 from the
// bottom, because node must be pointer-aligned, giving a total of 19 bits
// of count.
uint64_t operator() (T *node, uintptr_t cnt) const
{
return (uint64_t)((uintptr_t)(node))<<16 | (uint64_t)(cnt&((1<<19)-1));
}
T* operator() (uint64_t val) const
{
return (T*)(uintptr_t)(int64_t(val) >> 19 << 3);
}
};
// 32Bit Machine
template<typename T> struct pointer_pack<T,4>
{
uint64_t operator() (T *node, uintptr_t cnt) const
{
return (uint64_t)((uintptr_t)(node))<<32 | (uint64_t)(cnt);
}
T* operator() (uint64_t val) const
{
return (T*)(uintptr_t)(val >> 32);
}
};
} // detail
template<typename T, size_t Capacity>
struct lfstack
{
struct node_t final
{
std::atomic<uint64_t> next;
uintptr_t pushcnt;
T data;
};
using pointer_pack = detail::pointer_pack<node_t,sizeof(uintptr_t)>;
using node_type = typename std::aligned_storage<sizeof(node_t), 128>::type;
node_type _pool[Capacity];
alignas(128) std::atomic<uint64_t> _head = {0};
alignas(128) std::atomic<uint64_t> _free = {0};
lfstack()
{
// push all pool nodes to free stack
for(size_t i = 0; i < Capacity; ++i)
{
node_t* node = reinterpret_cast<node_t*>(&_pool[i]);
node->pushcnt = 0;
node->next.store(0, std::memory_order_relaxed);
_push(_free, node);
}
// Issue memory barrier so we can immediately start work
std::atomic_thread_fence(std::memory_order_release);
}
template<class U>
bool push(U && data)
{
node_t *node = _pop(_free);
if (node == nullptr)
return false;
node->data = std::forward<U>(data);
_push(_head, node);
return true;
}
bool pop(T& data)
{
node_t *node = _pop(_head);
if (node == nullptr)
return false;
data = std::move(node->data);
_push(_free, node);
return true;
}
void _push(std::atomic<uint64_t>& h, node_t* node)
{
node->pushcnt++;
uint64_t packed = pointer_pack()(node, node->pushcnt);
// check
// if(pointer_pack()(packed) != node)
// throw std::runtime_error("lfstack push invalid packing!");
uint64_t old = h.load(std::memory_order_relaxed);
do
{
node->next.store(old, std::memory_order_relaxed);
} while (!h.compare_exchange_weak(old, packed));
}
node_t* _pop(std::atomic<uint64_t>& h)
{
uint64_t next, old = h.load(std::memory_order_relaxed);
node_t* node;
do
{
if(old == 0) return nullptr;
node = pointer_pack()(old);
next = node->next.load(std::memory_order_relaxed);
} while (!h.compare_exchange_weak(old, next));
return node;
}
};
#include <atomic>
#include <thread>
#include <chrono>
#include <vector>
#include <array>
#include <future>
#include <iostream>
#include <random>
#include "lfstack_tagptr.hpp"
// #include "lfstack_dcas.hpp"
int main()
{
const size_t K = 100;
const size_t P = 4 * std::thread::hardware_concurrency();
const size_t N = 100000/1;
// Create 2 stacks.
lfstack<size_t,K> stacks[2];
std::cout << "Push "<<K<<" elements randomly onto the stacks.\n";
size_t sum = 0;
for(size_t i = 0; i < K; ++i)
{
sum += i;
stacks[i%2].push(i);
}
std::vector<double> timings(P);
std::vector<std::future<void>> dfuts;
std::atomic<int> cdl(P);
for (size_t i = 0; i < P; ++i)
dfuts.push_back(std::async(std::launch::async, [&, i]() {
--cdl;
while (cdl.load())
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::minstd_rand rng;
rng.seed(i);
std::uniform_int_distribution<size_t> distr(0, N);
auto start_time = std::chrono::high_resolution_clock::now();
// Pop a node from a random stack, then push it onto a random stack.
for(size_t it = 0; it < N; ++it)
{
size_t val;
if(stacks[ distr(rng)&1 ].pop(val))
{
stacks[ distr(rng)&1 ].push(val);
}
}
auto end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> dur = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time);
timings[i] = dur.count();
}));
for (auto& fut : dfuts)
fut.get();
std::cout << "Pop all elements from both stacks, and verify that nothing lost.\n";
size_t sum2 = 0;
size_t cnt = 0;
for(size_t i = 0; i < 2; ++i)
{
size_t val;
size_t pp = 0;
while(stacks[i].pop(val))
{
cnt++;
sum2 += val;
}
}
std::cout << "number of nodes " << cnt << "/" << K << "\n";
std::cout << "sum " << sum2 << "/" << sum << "\n";
double mean_time = 0;
for (size_t i = 0; i < P; ++i)
mean_time += timings[i];
std::cout << "mean time: " << (mean_time/P*1000) << "ms\n";
return 0;
}
@inspirit
Copy link
Author

on my 4 core i7 MacBook i have:
DCAS stack: ~370ms
TagPtr stack: ~260ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment