Last active
December 18, 2021 04:49
-
-
Save zedchance/7445f4b438792e533870fd5ae9a1dbc2 to your computer and use it in GitHub Desktop.
Bounded buffer synchronization problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// bounded buffer problem | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <semaphore.h> | |
#include <pthread.h> | |
#include "buffer.h" | |
unsigned int SEED = 12345; // seed for RNG | |
unsigned int WAIT_TIME = 100000; // wait time modulus | |
buffer_item buffer[BUFFER_SIZE]; // shared buffer | |
int size = 0, head = -1, tail = -1; // buffer pointers | |
int produced_items = 0, consumed_items = 0; // producer/consumer counters | |
long total_items = 0; // number of items to process | |
pthread_mutex_t mutex; // binary semaphore for mutual exclusion | |
sem_t empty, full; // counting semaphores for buffer slots | |
int is_empty() | |
{ | |
return size == 0; | |
} | |
int is_full() | |
{ | |
return size == BUFFER_SIZE; | |
} | |
int insert_item(buffer_item item) | |
{ | |
if (is_full()) | |
{ | |
return -1; | |
} | |
if (size == 0) | |
{ | |
head = 0; | |
tail = 0; | |
} | |
else | |
{ | |
tail = (tail + 1) % BUFFER_SIZE; | |
} | |
buffer[tail] = item; | |
size++; | |
return 0; | |
} | |
int remove_item(buffer_item * item) | |
{ | |
if (is_empty()) | |
{ | |
return -1; | |
} | |
*item = buffer[head]; | |
head = (head + 1) % BUFFER_SIZE; | |
size--; | |
if (size == 0) | |
{ | |
head = -1; | |
tail = -1; | |
} | |
return 0; | |
} | |
void print_buffer() | |
{ | |
for (int i = 0; i < BUFFER_SIZE; i++) | |
{ | |
printf("%3d ", buffer[i]); | |
} | |
printf("\t(size %d)\n", size); | |
} | |
void * producer(void * param) | |
{ | |
int id = *(int *) param; | |
buffer_item item; | |
while (1) | |
{ | |
// wait for random time | |
usleep(rand_r(&SEED) % WAIT_TIME); | |
// check if we're done | |
if (produced_items == total_items) | |
{ | |
pthread_exit(0); | |
} | |
// wait for an empty spot | |
// sem_trywait returns -1 if it is blocked | |
if (sem_trywait(&empty) == 0) | |
{ | |
// acquire lock | |
pthread_mutex_lock(&mutex); | |
// insert item | |
item = rand_r(&SEED) % 256; | |
if (insert_item(item)) | |
{ | |
fprintf(stderr, "producer %3d error\n", id); | |
} | |
else | |
{ | |
printf("producer %3d produced %3d\n", id, item); | |
produced_items++; | |
} | |
// release | |
pthread_mutex_unlock(&mutex); | |
sem_post(&full); | |
} | |
} | |
} | |
void * consumer(void * param) | |
{ | |
int id = *(int *) param; | |
buffer_item item; | |
while (1) | |
{ | |
// wait for random time | |
usleep(rand_r(&SEED) % WAIT_TIME); | |
// check if we're done | |
if (consumed_items == total_items) | |
{ | |
pthread_exit(0); | |
} | |
// wait for a filled spot | |
// sem_trywait returns -1 if it is blocked | |
if (sem_trywait(&full) == 0) | |
{ | |
// acquire lock | |
pthread_mutex_lock(&mutex); | |
// consume item | |
if (remove_item(&item)) | |
{ | |
fprintf(stderr, "consumer %3d error\n", id); | |
} | |
else | |
{ | |
printf("consumer %3d consumed %3d\n", id, item); | |
consumed_items++; | |
} | |
// release | |
pthread_mutex_unlock(&mutex); | |
sem_post(&empty); | |
} | |
} | |
} | |
int main(int argc, char ** argv) | |
{ | |
// get args | |
if (argc < 4) | |
{ | |
fprintf(stderr, "Not enough arguments.\n"); | |
fprintf(stderr, "Usage: %s pcount ccount nitems\n", argv[0]); | |
exit(1); | |
} | |
long p_count = strtol(argv[1], NULL, 10); | |
long c_count = strtol(argv[2], NULL, 10); | |
total_items = strtol(argv[3], NULL, 10); | |
// initialize semaphores | |
pthread_mutex_init(&mutex, NULL); | |
sem_init(&empty, 0, BUFFER_SIZE); | |
sem_init(&full, 0, 0); | |
// setup threads | |
printf("Setting up %ld producer thread(s)\n", p_count); | |
pthread_attr_t attr; | |
pthread_attr_init(&attr); | |
pthread_t producers[p_count]; | |
int producers_id[p_count]; | |
for (int i = 0; i < p_count; i++) | |
{ | |
producers_id[i] = i; | |
pthread_create(&producers[i], &attr, producer, producers_id + i); | |
} | |
printf("Setting up %ld consumer thread(s)\n", c_count); | |
pthread_t consumers[c_count]; | |
int consumers_id[c_count]; | |
for (int i = 0; i < c_count; i++) | |
{ | |
consumers_id[i] = i; | |
pthread_create(&consumers[i], &attr, consumer, consumers_id + i); | |
} | |
// wait for threads to exit | |
printf("Joining threads\n"); | |
for (int i = 0; i < p_count; i++) | |
{ | |
pthread_join(producers[i], NULL); | |
} | |
for (int i = 0; i < c_count; i++) | |
{ | |
pthread_join(consumers[i], NULL); | |
} | |
printf("completed %ld items\n", total_items); | |
// clean up | |
pthread_mutex_destroy(&mutex); | |
sem_destroy(&full); | |
sem_destroy(&empty); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// buffer header file | |
typedef int buffer_item; | |
#define BUFFER_SIZE 5 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
all: buffer | |
buffer: buffer.h buffer.c | |
clang buffer.c -o buffer -lpthread | |
test: buffer | |
./buffer 1 1 5 | |
test2: buffer | |
./buffer 20 20 20 | |
clean: | |
rm buffer |
Author
zedchance
commented
Nov 9, 2021
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment