Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Atomic/lockless linked list using 128-bits Compare And Swap to solve the A-B-A problem.
// --------------------------------------------------------------------------------------
// Atomic singly-linked intrusive list using 128-bits Compare And Swap (AKA: DCAS).
// Keeps a version counter with the list head to prevent the A-B-A problem.
//
// Based on the implementation found in moodycamel.com:
// http://moodycamel.com/blog/2014/solving-the-aba-problem-for-lock-free-free-lists
//
// My implementation uses raw GCC/Clang atomics intrinsics. While in theory
// std::atomic of a struct of exactly 16 bytes and properly aligned could
// use 128-bits DCAS, this is not guaranteed. It might very well be implemented
// using a spinlock, which would be less efficient. Relying on the compiler
// intrinsics makes this code non-portable, but we ensure it will be using the
// optimal HW instructions. On Windows/MSVC, we could use _InterlockedCompareExchange128.
//
// I consider this code sample to be Public Domain, no copyrights claimed.
//
// NOTE:
// To make sure GCC and Clang emit a 'lock cmpxchg16b', you might
// have to add the '-mcx16' flag to the command line, as explained in:
// http://stackoverflow.com/a/18433055/1198654
//
// Tested with:
// c++ -std=c++11 -fno-rtti -fno-exceptions -Wall -Wextra -O3 -mcx16
//
// ASM listing on Clang with Intel syntax:
// -S -mllvm --x86-asm-syntax=intel
// --------------------------------------------------------------------------------------
#include <cassert>
#include <cstdint>
#include <cstdio>
#include <thread>
#include <atomic>
// GCC seems to have second thoughts about inlining the list push/pop functions, even at -O3...
#define ALWAYS_INLINE __attribute__((__always_inline__))
// ------------------------------------------------------------------
// CAS 128-bits:
// ------------------------------------------------------------------
using int128 = __int128;
ALWAYS_INLINE int toBuiltInMemOrder(std::memory_order order)
{
switch (order)
{
case std::memory_order_relaxed : return __ATOMIC_RELAXED;
case std::memory_order_consume : return __ATOMIC_CONSUME;
case std::memory_order_acquire : return __ATOMIC_ACQUIRE;
case std::memory_order_release : return __ATOMIC_RELEASE;
case std::memory_order_acq_rel : return __ATOMIC_ACQ_REL;
case std::memory_order_seq_cst : return __ATOMIC_SEQ_CST;
default : assert(false); return 0;
} // switch (order)
}
ALWAYS_INLINE bool CAS128(volatile int128 * destination,
int128 * expected,
int128 desired,
std::memory_order successMemOrder,
std::memory_order failureMemOrder)
{
const bool weak = true;
return __atomic_compare_exchange_n(destination, expected, desired, weak,
toBuiltInMemOrder(successMemOrder),
toBuiltInMemOrder(failureMemOrder));
}
// ------------------------------------------------------------------
// AtomicSListNode:
// ------------------------------------------------------------------
template<typename T>
struct AtomicSListNode
{
T * m_next = nullptr;
};
// ------------------------------------------------------------------
// AtomicSList128:
// ------------------------------------------------------------------
// Atomic and A-B-A safe.
// Head pointer is paired with a version count that
// is incremented whenever we try to change it.
template<typename T>
class AtomicSList128 final
{
public:
AtomicSList128()
{
m_head.m_packed = 0;
}
ALWAYS_INLINE void push(T * node)
{
VersionedPtr newHead;
VersionedPtr currHead;
newHead.m_pointer = node;
currHead.m_packed = m_head.m_packed;
do {
newHead.m_version = currHead.m_version + 1;
node->m_next = currHead.m_pointer;
} while (!CAS128(&m_head.m_packed, &currHead.m_packed, newHead.m_packed,
std::memory_order_release, std::memory_order_relaxed));
}
ALWAYS_INLINE T * pop()
{
VersionedPtr newHead;
VersionedPtr currHead;
currHead.m_packed = m_head.m_packed;
while (currHead.m_pointer != nullptr)
{
newHead.m_pointer = currHead.m_pointer->m_next;
newHead.m_version = currHead.m_version + 1;
if (CAS128(&m_head.m_packed, &currHead.m_packed, newHead.m_packed,
std::memory_order_release, std::memory_order_acquire))
{
break;
}
}
return currHead.m_pointer;
}
// These are non-atomic.
ALWAYS_INLINE T * first() const
{
return m_head.m_pointer;
}
ALWAYS_INLINE bool isEmpty() const
{
return first() == nullptr;
}
// Syntactic sugar to iterate the list.
ALWAYS_INLINE static const T * next(const T * curr) { return curr->m_next; }
ALWAYS_INLINE static T * next( T * curr) { return curr->m_next; }
// Not copyable.
AtomicSList128(const AtomicSList128 &) = delete;
AtomicSList128 & operator = (const AtomicSList128 &) = delete;
private:
union alignas(16) VersionedPtr
{
struct
{
T * m_pointer;
std::uint64_t m_version;
};
int128 m_packed;
};
static_assert(sizeof(VersionedPtr) == 16, "Bad size!");
static_assert(alignof(VersionedPtr) == 16, "Bad alignment!");
VersionedPtr m_head;
};
// ------------------------------------------------------------------
// AtomicSList64:
// ------------------------------------------------------------------
// Atomic, but *not* A-B-A safe, for comparison with the above.
// We can concurrently push and pop, but only as long
// as the incoming pointer is guaranteed to be unique.
// Still covers a lot of usage cases though, such as
// concurrently building up a list that will later be
// consumed by a single thread.
template<typename T>
class AtomicSList64 final
{
public:
AtomicSList64()
: m_head{ nullptr }
{ }
ALWAYS_INLINE void push(T * node)
{
T * currHead = m_head.load(std::memory_order_relaxed);
do {
node->m_next = currHead;
} while (!m_head.compare_exchange_weak(currHead, node,
std::memory_order_release,
std::memory_order_relaxed));
}
ALWAYS_INLINE T * pop()
{
T * currHead = m_head.load(std::memory_order_relaxed);
while (currHead != nullptr)
{
if (m_head.compare_exchange_weak(currHead, currHead->m_next,
std::memory_order_release,
std::memory_order_acquire))
{
break;
}
}
return currHead;
}
// These are non-atomic.
ALWAYS_INLINE T * first() const
{
return m_head.load(std::memory_order_relaxed);
}
ALWAYS_INLINE bool isEmpty() const
{
return first() == nullptr;
}
// Syntactic sugar to iterate the list.
ALWAYS_INLINE static const T * next(const T * curr) { return curr->m_next; }
ALWAYS_INLINE static T * next( T * curr) { return curr->m_next; }
// Not copyable.
AtomicSList64(const AtomicSList64 &) = delete;
AtomicSList64 & operator = (const AtomicSList64 &) = delete;
private:
std::atomic<T *> m_head;
};
// ------------------------------------------------------------------
// Tests:
// ------------------------------------------------------------------
struct Test : AtomicSListNode<Test>
{
int m_value;
Test() = default;
Test(int val) : m_value{ val }
{ }
};
template<typename ListType>
static void testListSingleThread()
{
std::printf("\n>>> %s()\n", __func__);
std::printf("----------------------\n");
ListType list;
Test t0{0};
Test t1{1};
Test t2{2};
Test t3{3};
Test t4{4};
list.push(&t0);
list.push(&t1);
list.push(&t2);
list.push(&t3);
list.push(&t4);
for (const Test * curr = list.first(); curr; curr = list.next(curr))
{
std::printf("curr=%p, next=%p, value=%i\n", curr, list.next(curr), curr->m_value);
}
std::printf("----------------------\n");
const Test * p4 = list.pop();
const Test * p3 = list.pop();
const Test * p2 = list.pop();
assert(p4 == &t4);
assert(p3 == &t3);
assert(p2 == &t2);
assert(!list.isEmpty());
for (const Test * curr = list.first(); curr; curr = list.next(curr))
{
std::printf("curr=%p, next=%p, value=%i\n", curr, list.next(curr), curr->m_value);
}
const Test * p1 = list.pop();
const Test * p0 = list.pop();
assert(p1 == &t1);
assert(p0 == &t0);
assert(list.isEmpty());
assert(list.pop() == nullptr);
std::printf("----------------------\n");
}
template<typename ListType>
static void workOnTheList(ListType * pList)
{
assert(pList != nullptr);
constexpr int NumToPushPop = 256;
Test * items[NumToPushPop];
for (int i = 0; i < NumToPushPop; ++i)
{
items[i] = new Test{ i }; // We'll intentionally leak this memory for now...
}
for (int i = 0; i < NumToPushPop; ++i)
{
pList->push(items[i]);
}
assert(!pList->isEmpty());
for (int i = 0; i < NumToPushPop; ++i)
{
const Test * item = pList->pop();
assert(item != nullptr);
assert(item->m_value >= 0 && item->m_value < NumToPushPop);
}
std::printf("Thread %#zx OK\n", std::hash<std::thread::id>{}(std::this_thread::get_id()));
}
template<typename ListType>
static void testListWithThreads()
{
std::printf("\n>>> %s()\n", __func__);
std::printf("----------------------\n");
ListType theList;
// Multiple threads randomly pushing and popping to the list:
{
std::printf("Testing multiple threads operating on list ...\n");
constexpr int NumThreads = 8;
std::thread threads[NumThreads];
for (int t = 0; t < NumThreads; ++t)
{
threads[t] = std::thread(&workOnTheList<ListType>, &theList);
}
for (int t = 0; t < NumThreads; ++t)
{
std::printf("Joining thread #%i ...\n", t);
threads[t].join();
}
assert(theList.isEmpty());
}
// Producer consumer style of operation,
// one thread pushes, the other pops:
{
std::printf("Testing producer/consumer threads ...\n");
std::atomic<bool> stopProducing{ false };
std::atomic<bool> stopConsuming{ false };
std::thread producer{
[&theList, &stopProducing, &stopConsuming]() {
int next = 0;
while (!stopProducing)
{
Test * item = new Test{ next++ };
std::printf("[Producer]: Pushing item %p {%i}\n", item, item->m_value);
theList.push(item);
}
stopConsuming = true;
}
};
std::thread consumer{
[&theList, &stopConsuming]() {
while (!stopConsuming || !theList.isEmpty())
{
while (const Test * item = theList.pop())
{
std::printf("[Consumer]: Popping item %p {%i}\n", item, item->m_value);
delete item;
}
}
}
};
constexpr std::size_t count = 100000;
std::printf("Main thread counting to %zu ...\n", count);
for (std::size_t i = 0; i < count; ++i)
{
std::this_thread::yield();
}
stopProducing = true;
producer.join();
consumer.join();
assert(theList.isEmpty());
}
std::printf("Main thread OK\n");
std::printf("----------------------\n");
}
// ------------------------------------------------------------------
// main():
// ------------------------------------------------------------------
int main()
{
using AtomicListSimple = AtomicSList64<Test>;
using AtomicListABASafe = AtomicSList128<Test>;
testListSingleThread<AtomicListSimple>();
testListSingleThread<AtomicListABASafe>();
testListWithThreads<AtomicListSimple>();
testListWithThreads<AtomicListABASafe>();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment