Skip to content

Instantly share code, notes, and snippets.

@chergert
Last active July 20, 2022 14:43
Show Gist options
  • Save chergert/6018824 to your computer and use it in GitHub Desktop.
Save chergert/6018824 to your computer and use it in GitHub Desktop.
A thread-safe blocking LIFO or FIFO queue with maximum backlog.
#include <limits.h>
#include <pthread.h>
#include <stdlib.h>
typedef struct _queue_t queue_t;
typedef struct _queue_node_t queue_node_t;
struct _queue_node_t
{
queue_node_t *next;
queue_node_t *prev;
void *data;
};
struct _queue_t
{
pthread_mutex_t mutex;
pthread_cond_t rd_cond; /* notify readers of new items */
pthread_cond_t wr_cond; /* notify writers of more space */
queue_node_t *head; /* double linked list head */
queue_node_t *tail; /* double linked list tail */
unsigned length; /* number of queued items */
unsigned max_length; /* max number of queued items */
};
queue_t *
queue_new (unsigned max_length)
{
queue_t *q;
if (!max_length) {
max_length = UINT_MAX;
}
q = calloc(1, sizeof *q);
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->rd_cond, NULL);
pthread_cond_init(&q->wr_cond, NULL);
q->max_length = max_length;
q->length = 0;
q->head = NULL;
q->tail = NULL;
return q;
}
static inline void
wait_for_space (queue_t *queue)
{
while (queue->length == queue->max_length) {
pthread_cond_wait(&queue->wr_cond, &queue->mutex);
}
}
static inline void
wait_for_item (queue_t *queue)
{
while (!queue->head) {
pthread_cond_wait(&queue->rd_cond, &queue->mutex);
}
}
void
queue_push_head (queue_t *queue,
void *data)
{
queue_node_t *node;
node = calloc(1, sizeof *node);
node->data = data;
node->prev = NULL;
node->next = NULL;
pthread_mutex_lock(&queue->mutex);
wait_for_space(queue);
if ((node->next = queue->head)) {
node->next->prev = node;
}
queue->head = node;
if (!queue->tail) {
queue->tail = node;
}
queue->length++;
pthread_cond_signal(&queue->rd_cond);
pthread_mutex_unlock(&queue->mutex);
}
void
queue_push_tail (queue_t *queue,
void *data)
{
queue_node_t *node;
node = calloc(1, sizeof *node);
node->data = data;
node->next = NULL;
pthread_mutex_lock(&queue->mutex);
wait_for_space(queue);
if ((node->prev = queue->tail)) {
queue->tail->next = node;
}
queue->tail = node;
if (!queue->head) {
queue->head = node;
}
queue->length++;
pthread_cond_signal(&queue->rd_cond);
pthread_mutex_unlock(&queue->mutex);
}
void *
queue_pop_head (queue_t *queue)
{
queue_node_t *node;
void *ret = NULL;
pthread_mutex_lock(&queue->mutex);
wait_for_item(queue);
node = queue->head;
if ((queue->head = node->next)) {
queue->head->prev = NULL;
}
if (queue->tail == node) {
queue->tail = NULL;
}
queue->length--;
pthread_cond_signal(&queue->wr_cond);
pthread_mutex_unlock(&queue->mutex);
if (node) {
ret = node->data;
free(node);
}
return ret;
}
void *
queue_pop_tail (queue_t *queue)
{
queue_node_t *node;
void *ret = NULL;
pthread_mutex_lock(&queue->mutex);
wait_for_item(queue);
node = queue->tail;
if ((queue->tail = node->prev)) {
queue->tail->next = NULL;
}
if (queue->head == node) {
queue->head = NULL;
}
queue->length--;
pthread_cond_signal(&queue->wr_cond);
pthread_mutex_unlock(&queue->mutex);
if (node) {
ret = node->data;
free(node);
}
return ret;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment