Last active
October 27, 2021 07:56
-
-
Save ohazi/40746a16c7fea4593bd0b664638d7017 to your computer and use it in GitHub Desktop.
Single producer, single consumer, lock-free queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdint.h> | |
#include <stddef.h> | |
#include <string.h> | |
#include <stdatomic.h> | |
#define NUM_BLOCKS 16 | |
#define BLOCK_SIZE 256 | |
#define SUCCESS 0 | |
#define FAILURE 1 | |
// queue_buf is a circular buffer containing (NUM_BLOCKS - 1) blocks of size | |
// BLOCK_SIZE. | |
static uint8_t queue_buf[NUM_BLOCKS][BLOCK_SIZE]; | |
// queue_rd is the read index. If the queue is not empty, it contains the | |
// index of the next block that can be popped from the queue. This variable | |
// is owned by the consumer thread. | |
static atomic_size_t queue_rd; | |
// queue_wr is the write index. If the queue is not full, it contains the | |
// index of the location where the next block can be pushed onto the queue. | |
// This variable is owned by the producer thread. | |
static atomic_size_t queue_wr; | |
// The queue is empty when queue_rd = queue_wr. The queue is full when | |
// queue_wr = queue_rd - 1 (modulo wrapping). For simplicity, BLOCK_SIZE | |
// bytes are wasted in the queue_buf array due to the queue empty / full | |
// ambiguity. | |
// Initialize / reset the queue. | |
// Must be called before producer or consumer threads start. Cannot be called | |
// in either producer or consumer thread while the other is running. | |
static inline void queue_reset(void) { | |
queue_rd = 0; | |
queue_wr = 0; | |
} | |
// Snapshot of the number of blocks in the queue. Can only be called from the | |
// consumer thread. Consumer is guaranteed that there are at least this many | |
// blocks that can be read from the queue. | |
static inline size_t queue_blocks_occupied(void) { | |
size_t rd = atomic_load_explicit(&queue_rd, memory_order_relaxed); | |
size_t wr = atomic_load_explicit(&queue_wr, memory_order_acquire); | |
return wr < rd ? (NUM_BLOCKS + wr) - rd : wr - rd; | |
} | |
// Snapshot of the number of empty slots in queue. Can only be called from the | |
// producer thread. Producer is guaranteed that there are at least this many | |
// empty slots in the queue that can be written into. | |
static inline size_t queue_blocks_available(void) { | |
size_t rd = atomic_load_explicit(&queue_rd, memory_order_acquire); | |
size_t wr = atomic_load_explicit(&queue_wr, memory_order_relaxed); | |
return wr < rd ? rd - wr - 1 : ((NUM_BLOCKS - 1) + rd) - wr; | |
} | |
// Increment write index. | |
// Can only be called from producer thread. | |
static inline void queue_inc_wr(void) { | |
size_t wr_next = atomic_load_explicit(&queue_wr, memory_order_relaxed) + 1; | |
wr_next = wr_next < NUM_BLOCKS ? wr_next : 0; | |
atomic_store_explicit(&queue_wr, wr_next, memory_order_release); | |
} | |
// Increment read index. | |
// Can only be called from consumer thread. | |
static inline void queue_inc_rd(void) { | |
size_t rd_next = atomic_load_explicit(&queue_rd, memory_order_relaxed) + 1; | |
rd_next = rd_next < NUM_BLOCKS ? rd_next : 0; | |
atomic_store_explicit(&queue_rd, rd_next, memory_order_release); | |
} | |
// Inserts a new block into the queue. | |
// Can only be called from producer thread. | |
// buffer assumed to point to valid data of length BLOCK_SIZE. | |
static inline int queue_push(uint8_t *buffer) { | |
if (queue_blocks_available() > 0) { | |
size_t wr = atomic_load_explicit(&queue_wr, memory_order_relaxed); | |
memcpy(&queue_buf[wr][0], buffer, BLOCK_SIZE); | |
queue_inc_wr(); | |
return SUCCESS; | |
} else { | |
return FAILURE; | |
} | |
} | |
// Reads a block from the queue. | |
// Can only be called from consumer thread. | |
// buffer assumed to point to valid location of length BLOCK_SIZE. | |
static inline int queue_pop(uint8_t *buffer) { | |
if (queue_blocks_occupied() > 0) { | |
size_t rd = atomic_load_explicit(&queue_rd, memory_order_relaxed); | |
memcpy(buffer, &queue_buf[rd][0], BLOCK_SIZE); | |
queue_inc_rd(); | |
return SUCCESS; | |
} else { | |
return FAILURE; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment