-
-
Save buffet/3630d6ffd113a3f57d0ffcfca266d24f to your computer and use it in GitHub Desktop.
Producer/Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Copyright (c), Niclas Meyer <niclas@countingsort.com> | |
* | |
* This Source Code Form is subject to the terms of the Mozilla Public | |
* License, v. 2.0. If a copy of the MPL was not distributed with this file, | |
* You can obtain one at https://mozilla.org/MPL/2.0/. | |
*/ | |
#include <stdbool.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <pthread.h> | |
#include <semaphore.h> | |
struct ring_buffer { | |
size_t *buffer; // ring buffer | |
size_t front; | |
size_t rear; | |
size_t size; | |
}; | |
struct thread_arg { | |
struct ring_buffer rbuffer; | |
pthread_mutex_t buffer_mutex; | |
sem_t filled_count; | |
sem_t empty_count; | |
}; | |
static void *producer(void *arg); | |
static void *consumer(void *arg); | |
/* | |
* Print the usage and exit | |
*/ | |
static void | |
usage(const char *argv0) | |
{ | |
fprintf( | |
stderr, | |
"Usage: %s PRODUCER_COUNT CONSUMER_COUNT [BUFFER_SIZE]\n", | |
argv0); | |
exit(EXIT_FAILURE); | |
} | |
/** | |
* Produces an item | |
*/ | |
static size_t | |
produce_item(void) | |
{ | |
size_t item = rand() + 1; // can't be 0 this way | |
printf("Producing item... %zu\n", item); | |
return item; | |
} | |
/** | |
* Consumes an item | |
*/ | |
static void | |
consume_item(size_t item) | |
{ | |
printf("Consuming %zu...\n", item); | |
} | |
/** | |
* Prints a given buffer (x for used slots, - for free slots) | |
*/ | |
static void | |
print_buffer(struct ring_buffer *rb) | |
{ | |
printf("[ "); | |
for (size_t i = 0; i < rb->size; ++i) { | |
if (rb->buffer[i] != 0) { | |
printf("x "); | |
} else { | |
printf("- "); | |
} | |
} | |
printf("]"); | |
} | |
/** | |
* Puts an item into a buffer | |
*/ | |
static void | |
put_item_into_buffer(size_t item, struct ring_buffer *rb) | |
{ | |
rb->buffer[rb->front] = item; | |
rb->front = (rb->front + 1) % rb->size; | |
flockfile(stdout); | |
print_buffer(rb); | |
printf(" <- %zu\n", item); | |
funlockfile(stdout); | |
} | |
/** | |
* Take an item from a given buffer | |
*/ | |
static size_t | |
take_item_from_buffer(struct ring_buffer *rb) | |
{ | |
size_t item = rb->buffer[rb->rear]; | |
rb->buffer[rb->rear] = 0; | |
rb->rear = (rb->rear + 1) % rb->size; | |
flockfile(stdout); | |
print_buffer(rb); | |
printf(" -> %zu\n", item); | |
funlockfile(stdout); | |
return item; | |
} | |
int | |
main(int argc, char **argv) | |
{ | |
srand(time(NULL)); // seed RNG | |
// Parse arguments | |
if (argc < 3) { | |
usage(argv[0]); | |
} | |
size_t producer_count; | |
if (sscanf(argv[1], "%zu", &producer_count) != 1) { | |
usage(argv[0]); | |
} | |
size_t consumer_count; | |
if (sscanf(argv[2], "%zu", &consumer_count) != 1) { | |
usage(argv[0]); | |
} | |
size_t buffer_size; | |
if (argc < 4 || sscanf(argv[3], "%zu", &buffer_size) != 1) { | |
buffer_size = 10; | |
} | |
pthread_t thread; // we don't need the tids | |
// Initialize targ | |
struct thread_arg *targ = malloc(sizeof(*targ)); // one arg for all | |
if (!targ) { | |
fprintf(stderr, "Failed to allocate memory\n"); | |
exit(EXIT_FAILURE); | |
} | |
// Initialize buffer | |
targ->rbuffer.buffer = calloc(buffer_size, sizeof(*targ->rbuffer.buffer)); | |
if (!targ->rbuffer.buffer) { | |
fprintf(stderr, "Failed to allocate memory\n"); | |
exit(EXIT_FAILURE); | |
} | |
targ->rbuffer.front = 0; | |
targ->rbuffer.rear = 0; | |
targ->rbuffer.size = buffer_size; | |
if (pthread_mutex_init(&targ->buffer_mutex, NULL) != 0 | |
|| sem_init(&targ->empty_count, 0, buffer_size) < 0 | |
|| sem_init(&targ->filled_count, 0, 0) < 0) { | |
fprintf(stderr, "Failed to initialize AlP4/5 stuff\n"); | |
exit(EXIT_FAILURE); | |
} | |
// Spawn consumers | |
for (size_t i = 0; i < consumer_count; ++i) { | |
if (pthread_create(&thread, NULL, consumer, targ) != 0) { | |
fprintf(stderr, "Failed to spawn thread\n"); | |
exit(EXIT_FAILURE); | |
} | |
} | |
// Spawn producers | |
for (size_t i = 0; i < producer_count; ++i) { | |
if (pthread_create(&thread, NULL, producer, targ) != 0) { | |
fprintf(stderr, "Failed to spawn thread\n"); | |
exit(EXIT_FAILURE); | |
} | |
} | |
pthread_join(thread, NULL); // wait for the last thread, infinitely | |
} | |
// Some simple translations | |
#define procedure static void * | |
#define BUFFER_SIZE 0 | |
#define mutex int | |
#define semaphore int | |
#define produceItem produce_item | |
// These need to be macro functions to consume the empty () | |
#define producer() producer(void *arg) | |
#define consumer() consumer(void *arg) | |
// put_item_into_buffer/take_item_from_buffer take the buffer | |
#define putItemIntoBuffer(x) put_item_into_buffer(x, &targ->rbuffer) | |
#define removeItemFromBuffer() take_item_from_buffer(&targ->rbuffer) | |
// Wikipedia uses down and up for both mutexes and semaphores | |
// First idea was to re#define emptyCount etc. instead, | |
// but those are both used with down() and with up(), | |
// so I instead translate into a second layer of macros | |
// to do the specific calls directly. | |
#define down(x) down_##x | |
#define up(x) up_##x | |
#define down_buffer_mutex pthread_mutex_lock(&targ->buffer_mutex) | |
#define down_fillCount sem_wait(&targ->filled_count) | |
#define up_buffer_mutex pthread_mutex_unlock(&targ->buffer_mutex) | |
#define up_emptyCount sem_post(&targ->empty_count) | |
// Since we're not allowed to wait, we throw away computed values if the buffer is full | |
#define down_emptyCount if (sem_trywait(&targ->empty_count) < 0) printf("Buffer full. Throwing away...\n"); else { | |
// Surpress warnings by adding a return after both while loops | |
// (just for warnings) | |
#define consumeItem(x) consume_item(x); } pthread_exit(NULL); { | |
// up_fillCount needs to contain the closing } for down_emptyCount's else | |
#define up_fillCount sem_post(&targ->filled_count); }} pthread_exit(NULL); { | |
// We need to define a few variables. Easiest way to archieve this, | |
// is to redefine while and add the declarations before that. | |
#define while if (__LINE__ == 14) printf("New producer\n"); \ | |
else printf("New consumer\n"); \ | |
size_t item; struct thread_arg *targ = arg; while |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "_" | |
/* | |
* Taken from | |
* https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem#Using_semaphores | |
*/ | |
mutex buffer_mutex; // similar to "semaphore buffer_mutex = 1", but different (see notes below) | |
semaphore fillCount = 0; | |
semaphore emptyCount = BUFFER_SIZE; | |
procedure producer() | |
{ | |
while (true) | |
{ | |
item = produceItem(); | |
down(emptyCount); | |
down(buffer_mutex); | |
putItemIntoBuffer(item); | |
up(buffer_mutex); | |
up(fillCount); | |
} | |
} | |
procedure consumer() | |
{ | |
while (true) | |
{ | |
down(fillCount); | |
down(buffer_mutex); | |
item = removeItemFromBuffer(); | |
up(buffer_mutex); | |
up(emptyCount); | |
consumeItem(item); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment