Skip to content

Instantly share code, notes, and snippets.

@pjako
Last active January 28, 2021 10:58
Show Gist options
  • Save pjako/9e8aad90b6286c1ebc37dcb94f6445bb to your computer and use it in GitHub Desktop.
Save pjako/9e8aad90b6286c1ebc37dcb94f6445bb to your computer and use it in GitHub Desktop.
Lockless mpmc queue
#include <stdatomic.h>
#include <stdint.h>
#include <assert.h>
#include <stdbool.h>
typedef _Atomic(uint64_t) a64;
typedef _Atomic(uint32_t) a32;
#define atomicCompareExchange64(DESTPTR, COMPERAND, EXCHANGE) atomic_compare_exchange_weak(DESTPTR, COMPERAND, EXCHANGE)
#define atomicCompareExchange32(DESTPTR, COMPERAND, EXCHANGE) atomicCompareExchange64(DESTPTR, COMPERAND, EXCHANGE)
#define atomicLoad64(VALPTR) atomic_load(VALPTR)
#define atomicLoad32(VALPTR) atomic_load(VALPTR)
#define NUM_ELEMENTS 64
#define QUEUE_MASK (NUM_ELEMENTS - 1)
static struct { a64 out; a64 in; a32 elements[NUM_ELEMENTS]; } queue;
bool addToQueue(uint32_t index) {
index += 1; // 0 ist reseved to track fee entries so we increase indicies by one on add and decrease again it on pull
uint64_t in;
for (;;) {
a64 out = atomicLoad64(&queue.out);
in = atomicLoad64(&queue.in);
uint64_t inNext = in + 1;
// we want to make sure that the queue is not full
if (inNext == out) {
assert(!"Queue is full!");
return false;
}
if (atomicCompareExchange64(&queue.in, &in, inNext)) {
break;
}
}
a32* task = queue.elements + (in & QUEUE_MASK);
uint32_t expected = 0;
while (!atomicCompareExchange32(task, &expected, index)) {
// this entry still waits to get set to zero on the other end of the queue
// wait till it is done
}
return true;
}
enum {
invalidIndex = 0xFFFFFFFF, // uint32_t max value
};
uint32_t pullFromQueue(void) {
a64 in;
uint64_t out;
uint64_t outNext;
for (;;) {
// load in first
in = atomicLoad64(&queue.in);
out = atomicLoad64(&queue.out);
outNext = out + 1;
if (in <= out) {
// return if there is no task
return invalidIndex;
}
if (atomicCompareExchange64(&queue.out, &out, outNext)) {
break;
}
}
uint64_t relIdx = out & QUEUE_MASK;
a32* task = queue.elements + relIdx;
uint32_t index;
for (;;) {
index = atomicLoad32(task);
if (index == 0) {
continue;
}
if (atomicCompareExchange32(task, &index, 0)) {
break;
}
// this queue entry is still beeing added so we wait till its done
// this is a lock mechanic in this otherwise lockless implementation
}
return index - 1; // 0 - 1 == invalidIndex (its a defined behaviour for unsigned ints)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment