Skip to content

Instantly share code, notes, and snippets.

@ohazi
Last active October 27, 2021 07:56
Show Gist options
  • Save ohazi/40746a16c7fea4593bd0b664638d7017 to your computer and use it in GitHub Desktop.
Save ohazi/40746a16c7fea4593bd0b664638d7017 to your computer and use it in GitHub Desktop.
Single producer, single consumer, lock-free queue
#include <stdint.h>
#include <stddef.h>
#include <string.h>
#include <stdatomic.h>
#define NUM_BLOCKS 16
#define BLOCK_SIZE 256
#define SUCCESS 0
#define FAILURE 1
// queue_buf is a circular buffer containing (NUM_BLOCKS - 1) blocks of size
// BLOCK_SIZE.
static uint8_t queue_buf[NUM_BLOCKS][BLOCK_SIZE];
// queue_rd is the read index. If the queue is not empty, it contains the
// index of the next block that can be popped from the queue. This variable
// is owned by the consumer thread.
static atomic_size_t queue_rd;
// queue_wr is the write index. If the queue is not full, it contains the
// index of the location where the next block can be pushed onto the queue.
// This variable is owned by the producer thread.
static atomic_size_t queue_wr;
// The queue is empty when queue_rd = queue_wr. The queue is full when
// queue_wr = queue_rd - 1 (modulo wrapping). For simplicity, BLOCK_SIZE
// bytes are wasted in the queue_buf array due to the queue empty / full
// ambiguity.
// Initialize / reset the queue.
// Must be called before producer or consumer threads start. Cannot be called
// in either producer or consumer thread while the other is running.
static inline void queue_reset(void) {
queue_rd = 0;
queue_wr = 0;
}
// Snapshot of the number of blocks in the queue. Can only be called from the
// consumer thread. Consumer is guaranteed that there are at least this many
// blocks that can be read from the queue.
static inline size_t queue_blocks_occupied(void) {
size_t rd = atomic_load_explicit(&queue_rd, memory_order_relaxed);
size_t wr = atomic_load_explicit(&queue_wr, memory_order_acquire);
return wr < rd ? (NUM_BLOCKS + wr) - rd : wr - rd;
}
// Snapshot of the number of empty slots in queue. Can only be called from the
// producer thread. Producer is guaranteed that there are at least this many
// empty slots in the queue that can be written into.
static inline size_t queue_blocks_available(void) {
size_t rd = atomic_load_explicit(&queue_rd, memory_order_acquire);
size_t wr = atomic_load_explicit(&queue_wr, memory_order_relaxed);
return wr < rd ? rd - wr - 1 : ((NUM_BLOCKS - 1) + rd) - wr;
}
// Increment write index.
// Can only be called from producer thread.
static inline void queue_inc_wr(void) {
size_t wr_next = atomic_load_explicit(&queue_wr, memory_order_relaxed) + 1;
wr_next = wr_next < NUM_BLOCKS ? wr_next : 0;
atomic_store_explicit(&queue_wr, wr_next, memory_order_release);
}
// Increment read index.
// Can only be called from consumer thread.
static inline void queue_inc_rd(void) {
size_t rd_next = atomic_load_explicit(&queue_rd, memory_order_relaxed) + 1;
rd_next = rd_next < NUM_BLOCKS ? rd_next : 0;
atomic_store_explicit(&queue_rd, rd_next, memory_order_release);
}
// Inserts a new block into the queue.
// Can only be called from producer thread.
// buffer assumed to point to valid data of length BLOCK_SIZE.
static inline int queue_push(uint8_t *buffer) {
if (queue_blocks_available() > 0) {
size_t wr = atomic_load_explicit(&queue_wr, memory_order_relaxed);
memcpy(&queue_buf[wr][0], buffer, BLOCK_SIZE);
queue_inc_wr();
return SUCCESS;
} else {
return FAILURE;
}
}
// Reads a block from the queue.
// Can only be called from consumer thread.
// buffer assumed to point to valid location of length BLOCK_SIZE.
static inline int queue_pop(uint8_t *buffer) {
if (queue_blocks_occupied() > 0) {
size_t rd = atomic_load_explicit(&queue_rd, memory_order_relaxed);
memcpy(buffer, &queue_buf[rd][0], BLOCK_SIZE);
queue_inc_rd();
return SUCCESS;
} else {
return FAILURE;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment