Skip to content

Instantly share code, notes, and snippets.

@lpereira
Created October 31, 2015 12:17
Show Gist options
  • Save lpereira/d74da26c87f73e722db4 to your computer and use it in GitHub Desktop.
Save lpereira/d74da26c87f73e722db4 to your computer and use it in GitHub Desktop.
/*
* SPSC Bounded Queue
* Based on public domain C++ version by mstump[1]. Released under
* the same license terms.
*
* [1] https://github.com/mstump/queues/blob/master/include/spsc-bounded-queue.hpp
*/
#if !defined(__ATOMIC_RELAXED)
#define ATOMIC_RELAXED __ATOMIC_RELAXED
#define ATOMIC_ACQUIRE __ATOMIC_ACQUIRE
#define ATOMIC_RELEASE __ATOMIC_RELEASE
#endif
#if defined(__GNUC__) && (__GNUC__ * 100 + __GNUC_MINOR__ >= 470)
#define ATOMIC_TYPE(T) struct { volatile typeof(T) val; }
#define ATOMIC_INIT(P, V) do { (P)->val = (V); } while (0)
#define ATOMIC_LOAD(P, O) __atomic_load_n(&(P)->val, (O))
#define ATOMIC_STORE(P, V, O) __atomic_store_n(&(P)->val, (V), (O))
#elif defined(__has_feature) && __has_feature(c_atomic)
#define ATOMIC_TYPE(T) T
#define ATOMIC_INIT(P, V) __c11_atomic_init((P), (V))
#define ATOMIC_LOAD(P, O) __c11_atomic_load((P), (O))
#define ATOMIC_STORE(P, V, O) __c11_atomic_store((P), (V), (O))
#else
#error Unsupported compiler.
#endif
typedef ATOMIC_TYPE(size_t) atomic_size_t;
struct spsc_queue {
size_t size;
size_t mask;
void *buffer;
char cache_line_pad0[64 - sizeof(size_t) + sizeof(size_t) + sizeof(void *)];
atomic_size_t head;
char cache_line_pad1[64 - sizeof(atomic_size_t)];
atomic_size_t tail;
char cache_line_pad2[64 - sizeof(atomic_size_t)];
};
int
spsc_queue_init(struct spsc_queue *q, size_t size)
{
if (size == 0)
return -EINVAL;
if ((size & (~size + 1)) == size)
return -EINVAL;
q->buffer = aligned_alloc(sizeof(void *) * (1 + size), sizeof(void *));
if (!q->buffer)
return -ENOMEM;
q->head = ATOMIC_INIT(0);
q->tail = ATOMIC_INIT(0);
q->size = size;
q->mask = size - 1;
return 0;
}
void
spsc_queue_free(struct spsc_queue *q)
{
free(q->buffer);
}
bool
spsc_queue_push(struct spsc_queue *q, void *input)
{
const size_t head = ATOMIC_LOAD(&q->head, ATOMIC_RELAXED);
if (((ATOMIC_LOAD(q->tail, ATOMIC_ACQUIRE) - (head + 1)) & q->mask) >= 1) {
q->buffer[head & q->mask] = input;
ATOMIC_STORE(q->head, head + 1, ATOMIC_RELEASE);
return true;
}
return false;
}
void *
spsc_queue_pop(struct spsc_queue *q)
{
const size_t tail = ATOMIC_LOAD(&q->tail, ATOMIC_RELAXED);
if (((ATOMIC_LOAD(q->head, ATOMIC_ACQUIRE) - tail) & q->mask) >= 1) {
void *output = q->buffer[tail & q->mask];
ATOMIC_STORE(q->tail, tail + 1, ATOMIC_RELEASE);
return output;
}
return NULL;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment