Last active
July 20, 2022 14:43
-
-
Save chergert/6018824 to your computer and use it in GitHub Desktop.
A thread-safe blocking LIFO or FIFO queue with maximum backlog.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <limits.h> | |
#include <pthread.h> | |
#include <stdlib.h> | |
typedef struct _queue_t queue_t; | |
typedef struct _queue_node_t queue_node_t; | |
struct _queue_node_t | |
{ | |
queue_node_t *next; | |
queue_node_t *prev; | |
void *data; | |
}; | |
struct _queue_t | |
{ | |
pthread_mutex_t mutex; | |
pthread_cond_t rd_cond; /* notify readers of new items */ | |
pthread_cond_t wr_cond; /* notify writers of more space */ | |
queue_node_t *head; /* double linked list head */ | |
queue_node_t *tail; /* double linked list tail */ | |
unsigned length; /* number of queued items */ | |
unsigned max_length; /* max number of queued items */ | |
}; | |
queue_t * | |
queue_new (unsigned max_length) | |
{ | |
queue_t *q; | |
if (!max_length) { | |
max_length = UINT_MAX; | |
} | |
q = calloc(1, sizeof *q); | |
pthread_mutex_init(&q->mutex, NULL); | |
pthread_cond_init(&q->rd_cond, NULL); | |
pthread_cond_init(&q->wr_cond, NULL); | |
q->max_length = max_length; | |
q->length = 0; | |
q->head = NULL; | |
q->tail = NULL; | |
return q; | |
} | |
static inline void | |
wait_for_space (queue_t *queue) | |
{ | |
while (queue->length == queue->max_length) { | |
pthread_cond_wait(&queue->wr_cond, &queue->mutex); | |
} | |
} | |
static inline void | |
wait_for_item (queue_t *queue) | |
{ | |
while (!queue->head) { | |
pthread_cond_wait(&queue->rd_cond, &queue->mutex); | |
} | |
} | |
void | |
queue_push_head (queue_t *queue, | |
void *data) | |
{ | |
queue_node_t *node; | |
node = calloc(1, sizeof *node); | |
node->data = data; | |
node->prev = NULL; | |
node->next = NULL; | |
pthread_mutex_lock(&queue->mutex); | |
wait_for_space(queue); | |
if ((node->next = queue->head)) { | |
node->next->prev = node; | |
} | |
queue->head = node; | |
if (!queue->tail) { | |
queue->tail = node; | |
} | |
queue->length++; | |
pthread_cond_signal(&queue->rd_cond); | |
pthread_mutex_unlock(&queue->mutex); | |
} | |
void | |
queue_push_tail (queue_t *queue, | |
void *data) | |
{ | |
queue_node_t *node; | |
node = calloc(1, sizeof *node); | |
node->data = data; | |
node->next = NULL; | |
pthread_mutex_lock(&queue->mutex); | |
wait_for_space(queue); | |
if ((node->prev = queue->tail)) { | |
queue->tail->next = node; | |
} | |
queue->tail = node; | |
if (!queue->head) { | |
queue->head = node; | |
} | |
queue->length++; | |
pthread_cond_signal(&queue->rd_cond); | |
pthread_mutex_unlock(&queue->mutex); | |
} | |
void * | |
queue_pop_head (queue_t *queue) | |
{ | |
queue_node_t *node; | |
void *ret = NULL; | |
pthread_mutex_lock(&queue->mutex); | |
wait_for_item(queue); | |
node = queue->head; | |
if ((queue->head = node->next)) { | |
queue->head->prev = NULL; | |
} | |
if (queue->tail == node) { | |
queue->tail = NULL; | |
} | |
queue->length--; | |
pthread_cond_signal(&queue->wr_cond); | |
pthread_mutex_unlock(&queue->mutex); | |
if (node) { | |
ret = node->data; | |
free(node); | |
} | |
return ret; | |
} | |
void * | |
queue_pop_tail (queue_t *queue) | |
{ | |
queue_node_t *node; | |
void *ret = NULL; | |
pthread_mutex_lock(&queue->mutex); | |
wait_for_item(queue); | |
node = queue->tail; | |
if ((queue->tail = node->prev)) { | |
queue->tail->next = NULL; | |
} | |
if (queue->head == node) { | |
queue->head = NULL; | |
} | |
queue->length--; | |
pthread_cond_signal(&queue->wr_cond); | |
pthread_mutex_unlock(&queue->mutex); | |
if (node) { | |
ret = node->data; | |
free(node); | |
} | |
return ret; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment