Last active
September 30, 2019 02:22
-
-
Save itarato/88a18ca7f373c10724acb1959a56ddf5 to your computer and use it in GitHub Desktop.
Non blocking queue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include <stdio.h> | |
#include <pthread.h> | |
#define EXAMPLE_SIZE 100 | |
#define CONSUMER_COUNT 10 | |
static pthread_mutex_t q_mtx; // = PTHREAD_MUTEX_INITIALIZER; | |
static pthread_cond_t q_cond = PTHREAD_COND_INITIALIZER; | |
struct queue_link_t { | |
struct queue_link_t *next; | |
int value; | |
}; | |
struct queue_t { | |
struct queue_link_t *head; | |
int len; | |
}; | |
struct queue_link_t *new_queue_link(int value) { | |
struct queue_link_t *new = malloc(sizeof(struct queue_link_t)); | |
if (new == NULL) { | |
exit(EXIT_FAILURE); | |
} | |
new->value = value; | |
new->next = NULL; | |
return new; | |
} | |
void insert(struct queue_link_t *ql, int value) { | |
if (ql->next != NULL) { | |
insert(ql->next, value); | |
} else { | |
ql->next = new_queue_link(value); | |
} | |
} | |
void queue_insert(struct queue_t *q, int value) { | |
pthread_mutex_lock(&q_mtx); | |
if (q->head == NULL) { | |
q->head = new_queue_link(value); | |
} else { | |
insert(q->head, value); | |
} | |
q->len++; | |
pthread_mutex_unlock(&q_mtx); | |
pthread_cond_signal(&q_cond); | |
} | |
int queue_pull(struct queue_t *q) { | |
pthread_mutex_lock(&q_mtx); | |
if (q->len == 0) exit(EXIT_FAILURE); | |
if (q->head == NULL) exit(EXIT_FAILURE); | |
struct queue_link_t *ql = q->head; | |
int out = ql->value; | |
q->head = ql->next; | |
free(ql); | |
q->len--; | |
pthread_mutex_unlock(&q_mtx); | |
return out; | |
} | |
void print_queue(struct queue_link_t *ql) { | |
if (ql == NULL) { | |
printf(".\n"); | |
} else { | |
printf("%d ", ql->value); | |
print_queue(ql->next); | |
} | |
} | |
struct queue_t *make_queue() { | |
struct queue_t *q = malloc(sizeof(struct queue_t)); | |
q->len = 0; | |
q->head = NULL; | |
return q; | |
} | |
void free_queue_link(struct queue_link_t *ql) { | |
if (ql == NULL) return; | |
free_queue_link(ql->next); | |
free(ql); | |
} | |
void free_queue(struct queue_t *q) { | |
free_queue_link(q->head); | |
free(q); | |
} | |
void *producer(void *arg) { | |
struct queue_t *q = (struct queue_t *)arg; | |
for (int i = 0; i < EXAMPLE_SIZE * CONSUMER_COUNT; i++) { | |
printf("-> Start producing %d\n", i); | |
queue_insert(q, i); | |
printf("-> Done producing %d\n", i); | |
// sleep(1); | |
} | |
printf("Production has ended\n"); | |
return NULL; | |
} | |
void *consumer(void *arg) { | |
struct queue_t *q = (struct queue_t *)arg; | |
pthread_t self = pthread_self(); | |
int consumed = 0; | |
while (consumed < EXAMPLE_SIZE) { | |
printf("<- [..%lu] Start consuming\n", self % 100); | |
pthread_mutex_lock(&q_mtx); | |
while (q->len == 0) { | |
printf("Consumer is waiting\n"); | |
pthread_cond_wait(&q_cond, &q_mtx); | |
} | |
int value = queue_pull(q); | |
pthread_mutex_unlock(&q_mtx); | |
consumed++; | |
printf("<- Done consuming %d\n", value); | |
} | |
printf("Consumtion has ended\n"); | |
return NULL; | |
} | |
int main() { | |
pthread_mutexattr_t q_mtx_attr; | |
pthread_mutexattr_init(&q_mtx_attr); | |
pthread_mutexattr_settype(&q_mtx_attr, PTHREAD_MUTEX_RECURSIVE); | |
pthread_mutex_init(&q_mtx, &q_mtx_attr); | |
struct queue_t *q = make_queue(); | |
pthread_t t_producer; | |
pthread_create(&t_producer, NULL, producer, (void *)q); | |
pthread_t t_consumer[CONSUMER_COUNT]; | |
for (int i = 0; i < CONSUMER_COUNT; i++) | |
pthread_create(t_consumer + i, NULL, consumer, (void *)q); | |
pthread_join(t_producer, NULL); | |
for (int i = 0; i < CONSUMER_COUNT; i++) | |
pthread_join(t_consumer[i], NULL); | |
free_queue(q); | |
pthread_mutexattr_destroy(&q_mtx_attr); | |
exit(EXIT_SUCCESS); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment