#include <stdlib.h> | |
#include <pthread.h> | |
typedef struct { | |
int size; | |
int capacity; | |
int index; | |
int *storage; | |
pthread_mutex_t lock; | |
pthread_cond_t cond; | |
} buffer; | |
int buffer_init(buffer *buff, int capacity) { | |
if (capacity < 1) { | |
return 0; | |
} | |
buff->size = 0; | |
buff->capacity = capacity; | |
buff->index = 0; | |
buff->storage = (int*) malloc(sizeof(int) * capacity); | |
pthread_mutex_init(&buff->lock, NULL); | |
pthread_cond_init(&buff->cond, NULL); | |
return 1; | |
} | |
void buffer_destroy(buffer *buff) { | |
free(buff->storage); | |
pthread_mutex_destroy(&buff->lock); | |
pthread_cond_destroy(&buff->cond); | |
} | |
int buffer_full(buffer *buff) { | |
return buff->size == buff->capacity; | |
} | |
int buffer_empty(buffer *buff) { | |
return buff->size == 0; | |
} | |
/* Enqueues `data` in the buffer. Blocks until there is room (threadsafe). */ | |
void buffer_enqueue(buffer *buff, int data) { | |
/* acquire lock on data structure */ | |
pthread_mutex_lock(&(buff->lock)); | |
/* wait until there is room in the buffer */ | |
while (buffer_full(buff)) { | |
pthread_cond_wait(&(buff->cond), &(buff->lock)); | |
} | |
/* insert data into the buffer */ | |
buff->storage[buff->index] = data; | |
buff->index = (buff->index + 1) % buff->capacity; | |
buff->size++; | |
/* signal condition and release lock */ | |
pthread_cond_signal(&buff->cond); | |
pthread_mutex_unlock(&buff->lock); | |
} | |
/* Enqueues `data` in the buffer. Blocks until there is room (threadsafe). */ | |
int buffer_dequeue(buffer *buff) { | |
pthread_mutex_lock(&(buff->lock)); | |
while (buffer_empty(buff)) { | |
pthread_cond_wait(&(buff->cond), &(buff->lock)); | |
} | |
int i = buff->index - buff->size; | |
if (i < 0) { | |
i += buff->capacity; | |
} | |
int data = buff->storage[i]; | |
buff->size--; | |
/* signal condition and release lock */ | |
pthread_cond_signal(&buff->cond); | |
pthread_mutex_unlock(&buff->lock); | |
return data; | |
} |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
#include "buffer.c" | |
#define PRODUCE_TIME 5 | |
#define CONSUME_TIME 2 | |
#define SIMULATION_TIME 20 | |
void *producer(void *p) { | |
buffer *buff = (buffer*) p; | |
/* constantly produce data */ | |
while (1) { | |
/* simulate work to produce data */ | |
sleep(PRODUCE_TIME); | |
/* produce data (integer between 1 and 5) */ | |
int data = rand() % 5 + 1; | |
printf("Produced data: %i\n", data); | |
fflush(stdout); | |
/* insert data into the buffer */ | |
buffer_enqueue(buff, data); | |
} | |
} | |
void *consumer(void *p) { | |
buffer *buff = (buffer*) p; | |
/* constantly consume data */ | |
while (1) { | |
/* retrieve data */ | |
int data = buffer_dequeue(buff); | |
/* simulate work to consume data */ | |
sleep(CONSUME_TIME); | |
printf("Consumed data: %i\n", data); | |
fflush(stdout); | |
} | |
} | |
int main(int argc, char **argv) { | |
/* check arguments */ | |
if (argc != 4) { | |
printf("Usage:\n"); | |
printf("%s producers consumers buffer_capacity\n", argv[0]); | |
return 1; | |
} | |
/* seed rand with the current time */ | |
srand(time(NULL)); | |
/* get number of producers */ | |
int producers = atoi(argv[1]); | |
if (producers < 1) { | |
producers = 1; | |
} | |
/* get number of consumers */ | |
int consumers = atoi(argv[2]); | |
if (consumers < 1) { | |
consumers = 1; | |
} | |
/* get buffer capacity */ | |
int capacity = atoi(argv[3]); | |
if (capacity < 1) { | |
capacity = 1; | |
} | |
printf("Producers: %i\n", producers); | |
printf("Consumers: %i\n", consumers); | |
printf("Capacity: %i\n", capacity); | |
buffer buff; | |
buffer_init(&buff, capacity); | |
pthread_attr_t attr; | |
pthread_attr_init(&attr); | |
int i; | |
for (i = 0; i < producers; i++) { | |
pthread_t producer_thread; | |
pthread_create(&producer_thread, &attr, producer, (void*)&buff); | |
} | |
for (i = 0; i < consumers; i++) { | |
pthread_t consumer_thread; | |
pthread_create(&consumer_thread, &attr, consumer, (void*)&buff); | |
} | |
/* wait until simulation is over */ | |
sleep(SIMULATION_TIME); | |
buffer_destroy(&buff); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment