Created
November 14, 2017 10:17
-
-
Save s8sg/158266244b8315c1b901bdabb9cb9f69 to your computer and use it in GitHub Desktop.
A blazing fast multi producer single consumer lockless buffer queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdbool.h> | |
#include <pthread.h> | |
#define EMPTY(pool) pool->end == NULL; | |
/* pool_node_t must be the last member of data item that needs | |
* to be added in the queue: | |
* | |
* typedef struct mydata { | |
* .... | |
* pool_node_t node; | |
* } mydata_t; | |
*/ | |
typedef struct pool_node { | |
struct pool_node *next; | |
struct pool_node *prev; | |
} pool_node_t; | |
struct pool { | |
pool_node_t *start; | |
pool_node_t *end; | |
}; | |
typedef struct pool pool_t; | |
/* As there are multiplle producer we need atomic operation | |
* while dequeueing */ | |
#define __SET_END(pool, old_node, node) __sync_bool_compare_and_swap(&(pool->end), old_node, node) | |
#define __SET_START(pool, old_node, node) __sync_bool_compare_and_swap(&(pool->start), old_node, node) | |
#define __INIT_NODE(node) do{node->next=NULL;node->prev=NULL;}while(0) | |
/* Cast a data out to its initial type from node reference | |
* mydata_t *mydata = CAST_DATA(node, mydata_t) | |
*/ | |
#define CAST_DATA(node, datatype) (datatype *)((char*)node - (sizeof(datatype) - sizeof(pool_node_t))) | |
// Init a pool | |
static inline void init_pool(pool_t *pool) { | |
pool->start = NULL; | |
pool->end = NULL; | |
} | |
/* Example: | |
* | |
* init: | |
* ----- | |
* reqpool_t pool; | |
* init_pool(&pool); | |
* | |
* enqueue (N): | |
* ----------- | |
* mydata_t *mydata = (mydata_t*)malloc(sizeof(mydata_t)); | |
* enqueuei_mpsc(pool, &mydata->node); | |
* | |
* dequeue (1): | |
* ----------- | |
* mydata_t *mydata; | |
* pool_node_t *node = dequeue_mpsc(pool); | |
* mydata = CAST_DATA(node, mydata_t); | |
*/ | |
// Add node to the pool | |
static inline void enqueue_mpsc(pool_t *pool, pool_node_t *node) { | |
pool_node_t *start = NULL; | |
__INIT_NODE(node); | |
while(true) { | |
start = pool->start; | |
node->next = start; | |
if(__SET_START(pool, start, node)) { | |
/* we never reset pool->start to NULL even if the queue | |
* is empty so if the same address is enqueued twice it | |
* will create the vicious loop of a->prev = a. Which would | |
* cause dequeue to return same adress multiple time | |
* So we perform the extra check: start != node | |
* */ | |
if (start != NULL && start != node) { | |
start->prev = node; | |
} | |
/* only successful if end is NULL */ | |
__SET_END(pool, NULL, node); | |
break; | |
}else { | |
__INIT_NODE(node); | |
start = NULL; | |
continue; | |
} | |
} | |
} | |
// Dequeue node from pool (strictly single consumer) | |
static inline pool_node_t * dequeue_mpsc(pool_t *pool) { | |
pool_node_t *node = NULL; | |
node = pool->end; | |
if (node != NULL) { | |
if (!__SET_END(pool, node, node->prev)) { | |
node = NULL; | |
} | |
} | |
return node; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment