Skip to content

Instantly share code, notes, and snippets.

@itarato
Last active September 30, 2019 02:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itarato/88a18ca7f373c10724acb1959a56ddf5 to your computer and use it in GitHub Desktop.
Save itarato/88a18ca7f373c10724acb1959a56ddf5 to your computer and use it in GitHub Desktop.
Non blocking queue.
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#define EXAMPLE_SIZE 100
#define CONSUMER_COUNT 10
static pthread_mutex_t q_mtx; // = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t q_cond = PTHREAD_COND_INITIALIZER;
struct queue_link_t {
struct queue_link_t *next;
int value;
};
struct queue_t {
struct queue_link_t *head;
int len;
};
struct queue_link_t *new_queue_link(int value) {
struct queue_link_t *new = malloc(sizeof(struct queue_link_t));
if (new == NULL) {
exit(EXIT_FAILURE);
}
new->value = value;
new->next = NULL;
return new;
}
void insert(struct queue_link_t *ql, int value) {
if (ql->next != NULL) {
insert(ql->next, value);
} else {
ql->next = new_queue_link(value);
}
}
void queue_insert(struct queue_t *q, int value) {
pthread_mutex_lock(&q_mtx);
if (q->head == NULL) {
q->head = new_queue_link(value);
} else {
insert(q->head, value);
}
q->len++;
pthread_mutex_unlock(&q_mtx);
pthread_cond_signal(&q_cond);
}
int queue_pull(struct queue_t *q) {
pthread_mutex_lock(&q_mtx);
if (q->len == 0) exit(EXIT_FAILURE);
if (q->head == NULL) exit(EXIT_FAILURE);
struct queue_link_t *ql = q->head;
int out = ql->value;
q->head = ql->next;
free(ql);
q->len--;
pthread_mutex_unlock(&q_mtx);
return out;
}
void print_queue(struct queue_link_t *ql) {
if (ql == NULL) {
printf(".\n");
} else {
printf("%d ", ql->value);
print_queue(ql->next);
}
}
struct queue_t *make_queue() {
struct queue_t *q = malloc(sizeof(struct queue_t));
q->len = 0;
q->head = NULL;
return q;
}
void free_queue_link(struct queue_link_t *ql) {
if (ql == NULL) return;
free_queue_link(ql->next);
free(ql);
}
void free_queue(struct queue_t *q) {
free_queue_link(q->head);
free(q);
}
void *producer(void *arg) {
struct queue_t *q = (struct queue_t *)arg;
for (int i = 0; i < EXAMPLE_SIZE * CONSUMER_COUNT; i++) {
printf("-> Start producing %d\n", i);
queue_insert(q, i);
printf("-> Done producing %d\n", i);
// sleep(1);
}
printf("Production has ended\n");
return NULL;
}
void *consumer(void *arg) {
struct queue_t *q = (struct queue_t *)arg;
pthread_t self = pthread_self();
int consumed = 0;
while (consumed < EXAMPLE_SIZE) {
printf("<- [..%lu] Start consuming\n", self % 100);
pthread_mutex_lock(&q_mtx);
while (q->len == 0) {
printf("Consumer is waiting\n");
pthread_cond_wait(&q_cond, &q_mtx);
}
int value = queue_pull(q);
pthread_mutex_unlock(&q_mtx);
consumed++;
printf("<- Done consuming %d\n", value);
}
printf("Consumtion has ended\n");
return NULL;
}
int main() {
pthread_mutexattr_t q_mtx_attr;
pthread_mutexattr_init(&q_mtx_attr);
pthread_mutexattr_settype(&q_mtx_attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&q_mtx, &q_mtx_attr);
struct queue_t *q = make_queue();
pthread_t t_producer;
pthread_create(&t_producer, NULL, producer, (void *)q);
pthread_t t_consumer[CONSUMER_COUNT];
for (int i = 0; i < CONSUMER_COUNT; i++)
pthread_create(t_consumer + i, NULL, consumer, (void *)q);
pthread_join(t_producer, NULL);
for (int i = 0; i < CONSUMER_COUNT; i++)
pthread_join(t_consumer[i], NULL);
free_queue(q);
pthread_mutexattr_destroy(&q_mtx_attr);
exit(EXIT_SUCCESS);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment