Skip to content

Instantly share code, notes, and snippets.

@alexcoco
Created April 2, 2014 15:07
Show Gist options
  • Save alexcoco/e22503c9461adead2b7b to your computer and use it in GitHub Desktop.
Save alexcoco/e22503c9461adead2b7b to your computer and use it in GitHub Desktop.
#include <stdlib.h>
#include <pthread.h>
typedef struct {
int size;
int capacity;
int index;
int *storage;
pthread_mutex_t lock;
pthread_cond_t cond;
} buffer;
int buffer_init(buffer *buff, int capacity) {
if (capacity < 1) {
return 0;
}
buff->size = 0;
buff->capacity = capacity;
buff->index = 0;
buff->storage = (int*) malloc(sizeof(int) * capacity);
pthread_mutex_init(&buff->lock, NULL);
pthread_cond_init(&buff->cond, NULL);
return 1;
}
void buffer_destroy(buffer *buff) {
free(buff->storage);
pthread_mutex_destroy(&buff->lock);
pthread_cond_destroy(&buff->cond);
}
int buffer_full(buffer *buff) {
return buff->size == buff->capacity;
}
int buffer_empty(buffer *buff) {
return buff->size == 0;
}
/* Enqueues `data` in the buffer. Blocks until there is room (threadsafe). */
void buffer_enqueue(buffer *buff, int data) {
/* acquire lock on data structure */
pthread_mutex_lock(&(buff->lock));
/* wait until there is room in the buffer */
while (buffer_full(buff)) {
pthread_cond_wait(&(buff->cond), &(buff->lock));
}
/* insert data into the buffer */
buff->storage[buff->index] = data;
buff->index = (buff->index + 1) % buff->capacity;
buff->size++;
/* signal condition and release lock */
pthread_cond_signal(&buff->cond);
pthread_mutex_unlock(&buff->lock);
}
/* Enqueues `data` in the buffer. Blocks until there is room (threadsafe). */
int buffer_dequeue(buffer *buff) {
pthread_mutex_lock(&(buff->lock));
while (buffer_empty(buff)) {
pthread_cond_wait(&(buff->cond), &(buff->lock));
}
int i = buff->index - buff->size;
if (i < 0) {
i += buff->capacity;
}
int data = buff->storage[i];
buff->size--;
/* signal condition and release lock */
pthread_cond_signal(&buff->cond);
pthread_mutex_unlock(&buff->lock);
return data;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include "buffer.c"
#define PRODUCE_TIME 5
#define CONSUME_TIME 2
#define SIMULATION_TIME 20
void *producer(void *p) {
buffer *buff = (buffer*) p;
/* constantly produce data */
while (1) {
/* simulate work to produce data */
sleep(PRODUCE_TIME);
/* produce data (integer between 1 and 5) */
int data = rand() % 5 + 1;
printf("Produced data: %i\n", data);
fflush(stdout);
/* insert data into the buffer */
buffer_enqueue(buff, data);
}
}
void *consumer(void *p) {
buffer *buff = (buffer*) p;
/* constantly consume data */
while (1) {
/* retrieve data */
int data = buffer_dequeue(buff);
/* simulate work to consume data */
sleep(CONSUME_TIME);
printf("Consumed data: %i\n", data);
fflush(stdout);
}
}
int main(int argc, char **argv) {
/* check arguments */
if (argc != 4) {
printf("Usage:\n");
printf("%s producers consumers buffer_capacity\n", argv[0]);
return 1;
}
/* seed rand with the current time */
srand(time(NULL));
/* get number of producers */
int producers = atoi(argv[1]);
if (producers < 1) {
producers = 1;
}
/* get number of consumers */
int consumers = atoi(argv[2]);
if (consumers < 1) {
consumers = 1;
}
/* get buffer capacity */
int capacity = atoi(argv[3]);
if (capacity < 1) {
capacity = 1;
}
printf("Producers: %i\n", producers);
printf("Consumers: %i\n", consumers);
printf("Capacity: %i\n", capacity);
buffer buff;
buffer_init(&buff, capacity);
pthread_attr_t attr;
pthread_attr_init(&attr);
int i;
for (i = 0; i < producers; i++) {
pthread_t producer_thread;
pthread_create(&producer_thread, &attr, producer, (void*)&buff);
}
for (i = 0; i < consumers; i++) {
pthread_t consumer_thread;
pthread_create(&consumer_thread, &attr, consumer, (void*)&buff);
}
/* wait until simulation is over */
sleep(SIMULATION_TIME);
buffer_destroy(&buff);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment