-
-
Save paulyc/4c929b70949f2ef1d787e766e742f591 to your computer and use it in GitHub Desktop.
A blazing fast single producer multiple consumer lockless queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* NOTE: The design decision of the req pool is tuned to get the | |
* best possible performance. Below point describes | |
* the design decisions: | |
* >> reqpool is a buffer queue where the producer adds at the start | |
* and consumers consume from the end | |
* >> it is strictly one consumer and multiple producers queue | |
* >> It is unbounded queue and avoids any resource allocation on heap | |
* >> It is lockless and use atomic operation to avoid race condition | |
* for consumers | |
*/ | |
#include <stdbool.h> | |
#define EMPTY(pool) pool->end == NULL; | |
/* As there are multiplle producer we need atomic operation | |
* * while dequeueing */ | |
#define SET_END(pool, old_node, node) __sync_bool_compare_and_swap(&(pool->end), old_node, node) | |
typedef struct pool_node { | |
struct pool_node *next; | |
struct pool_node *prev; | |
} pool_node_t; | |
struct reqpool { | |
pool_node_t *start; | |
pool_node_t *end; | |
}; | |
typedef struct reqpool reqpool_t; | |
/* cast a data from the node | |
* Example: | |
* | |
* typedef struct mydata { | |
* .... | |
* pool_node_t node; | |
* } mydata_t; | |
* | |
* enqueue: | |
* | |
* enqueue(pool, &(mydata->node)); | |
* | |
* | |
* dequeue: | |
* | |
* mydata_t *mydata; | |
* pool_node_t *node = dequeue(pool); | |
* mydata = CAST_DATA(node, mydata_t); | |
*/ | |
#define CAST_DATA(node, datatype) (datatype *)((char*)node - (sizeof(datatype) - sizeof(pool_node_t))) | |
static inline void init_pool(reqpool_t *pool) { | |
pool->start = NULL; | |
pool->end = NULL; | |
} | |
// Add node to the pool, strictly single producer | |
static inline void enqueue(reqpool_t *pool, pool_node_t *node) { | |
node->next = NULL; | |
node->prev = NULL; | |
/* we never reset pool->start to NULL even if the queue | |
* is empty so if the same address is used twice it will | |
* create the vicious loop of a->prev = a. Which would | |
* cause dequeue to return same adress multiple time | |
* */ | |
if(pool->start != NULL && pool->start != node) | |
pool->start->prev = node; | |
node->next=pool->start; | |
pool->start=node; | |
/* only successful if end is NULL */ | |
SET_END(pool, NULL, node); | |
} | |
// Dequeue node from pool | |
static inline pool_node_t * dequeue(reqpool_t *pool) { | |
pool_node_t *node = NULL; | |
node = pool->end; | |
if(node != NULL) { | |
if(!SET_END(pool, node, node->prev)) { | |
node = NULL; | |
} | |
} | |
return node; | |
} | |
/******* MORE detailed Example ******************************************************************************************* | |
#include<stdbool.h> | |
#include<stdio.h> | |
#include<stdlib.h> | |
#include<pthread.h> | |
#include<unistd.h> | |
#include <stdbool.h> | |
#include "gslb_reqpool.h" | |
typedef struct mydata { | |
int data; | |
pool_node_t node; // node links request_data in list) | |
} mydata_t; | |
static void *consumer(void *param) { | |
mydata_t *mydata = NULL; | |
reqpool_t *pool = (reqpool_t*)param; | |
pool_node_t *node = NULL; | |
while (true) { | |
node = dequeue(pool); | |
if (node != NULL) { | |
mydata = CAST_DATA(node, mydata_t); | |
printf("dequeue: %d\n", mydata->data); | |
free(mydata); | |
} | |
} | |
} | |
static void *producer(void *param) { | |
int counter = 1; | |
reqpool_t *pool = (reqpool_t*)param; | |
while(true) { | |
mydata_t *mydata = (mydata_t *)malloc(sizeof(mydata_t)); | |
mydata->data = counter; | |
enqueue(pool, &(mydata->node)); | |
counter++; | |
} | |
} | |
int main(int argc, char **argv) { | |
reqpool_t pool; | |
pthread_t p_thread; | |
pthread_t c_thread[12]; | |
char *b; | |
int i = 0; | |
init_pool(&pool); | |
pthread_create(&p_thread, NULL, producer, &pool); | |
for(i=0; i<12; i++) { | |
pthread_create(&c_thread[i], NULL, consumer, &pool); | |
} | |
pthread_join(p_thread, (void**)&b); | |
for(i=0; i<12; i++) { | |
pthread_join(c_thread[i], (void**)&b); | |
} | |
} | |
************************************************************************************************************************/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment