Skip to content

Instantly share code, notes, and snippets.

@s8sg
Created November 14, 2017 10:17
Show Gist options
  • Save s8sg/158266244b8315c1b901bdabb9cb9f69 to your computer and use it in GitHub Desktop.
Save s8sg/158266244b8315c1b901bdabb9cb9f69 to your computer and use it in GitHub Desktop.
A blazing fast multi producer single consumer lockless buffer queue
#include <stdbool.h>
#include <pthread.h>
#define EMPTY(pool) pool->end == NULL;
/* pool_node_t must be the last member of data item that needs
* to be added in the queue:
*
* typedef struct mydata {
* ....
* pool_node_t node;
* } mydata_t;
*/
typedef struct pool_node {
struct pool_node *next;
struct pool_node *prev;
} pool_node_t;
struct pool {
pool_node_t *start;
pool_node_t *end;
};
typedef struct pool pool_t;
/* As there are multiplle producer we need atomic operation
* while dequeueing */
#define __SET_END(pool, old_node, node) __sync_bool_compare_and_swap(&(pool->end), old_node, node)
#define __SET_START(pool, old_node, node) __sync_bool_compare_and_swap(&(pool->start), old_node, node)
#define __INIT_NODE(node) do{node->next=NULL;node->prev=NULL;}while(0)
/* Cast a data out to its initial type from node reference
* mydata_t *mydata = CAST_DATA(node, mydata_t)
*/
#define CAST_DATA(node, datatype) (datatype *)((char*)node - (sizeof(datatype) - sizeof(pool_node_t)))
// Init a pool
static inline void init_pool(pool_t *pool) {
pool->start = NULL;
pool->end = NULL;
}
/* Example:
*
* init:
* -----
* reqpool_t pool;
* init_pool(&pool);
*
* enqueue (N):
* -----------
* mydata_t *mydata = (mydata_t*)malloc(sizeof(mydata_t));
* enqueuei_mpsc(pool, &mydata->node);
*
* dequeue (1):
* -----------
* mydata_t *mydata;
* pool_node_t *node = dequeue_mpsc(pool);
* mydata = CAST_DATA(node, mydata_t);
*/
// Add node to the pool
static inline void enqueue_mpsc(pool_t *pool, pool_node_t *node) {
pool_node_t *start = NULL;
__INIT_NODE(node);
while(true) {
start = pool->start;
node->next = start;
if(__SET_START(pool, start, node)) {
/* we never reset pool->start to NULL even if the queue
* is empty so if the same address is enqueued twice it
* will create the vicious loop of a->prev = a. Which would
* cause dequeue to return same adress multiple time
* So we perform the extra check: start != node
* */
if (start != NULL && start != node) {
start->prev = node;
}
/* only successful if end is NULL */
__SET_END(pool, NULL, node);
break;
}else {
__INIT_NODE(node);
start = NULL;
continue;
}
}
}
// Dequeue node from pool (strictly single consumer)
static inline pool_node_t * dequeue_mpsc(pool_t *pool) {
pool_node_t *node = NULL;
node = pool->end;
if (node != NULL) {
if (!__SET_END(pool, node, node->prev)) {
node = NULL;
}
}
return node;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment