Skip to content

Instantly share code, notes, and snippets.

@buffet
Created May 23, 2019 22:05
Show Gist options
  • Save buffet/3630d6ffd113a3f57d0ffcfca266d24f to your computer and use it in GitHub Desktop.
Save buffet/3630d6ffd113a3f57d0ffcfca266d24f to your computer and use it in GitHub Desktop.
Producer/Consumer
/* Copyright (c), Niclas Meyer <niclas@countingsort.com>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
struct ring_buffer {
size_t *buffer; // ring buffer
size_t front;
size_t rear;
size_t size;
};
struct thread_arg {
struct ring_buffer rbuffer;
pthread_mutex_t buffer_mutex;
sem_t filled_count;
sem_t empty_count;
};
static void *producer(void *arg);
static void *consumer(void *arg);
/*
* Print the usage and exit
*/
static void
usage(const char *argv0)
{
fprintf(
stderr,
"Usage: %s PRODUCER_COUNT CONSUMER_COUNT [BUFFER_SIZE]\n",
argv0);
exit(EXIT_FAILURE);
}
/**
* Produces an item
*/
static size_t
produce_item(void)
{
size_t item = rand() + 1; // can't be 0 this way
printf("Producing item... %zu\n", item);
return item;
}
/**
* Consumes an item
*/
static void
consume_item(size_t item)
{
printf("Consuming %zu...\n", item);
}
/**
* Prints a given buffer (x for used slots, - for free slots)
*/
static void
print_buffer(struct ring_buffer *rb)
{
printf("[ ");
for (size_t i = 0; i < rb->size; ++i) {
if (rb->buffer[i] != 0) {
printf("x ");
} else {
printf("- ");
}
}
printf("]");
}
/**
* Puts an item into a buffer
*/
static void
put_item_into_buffer(size_t item, struct ring_buffer *rb)
{
rb->buffer[rb->front] = item;
rb->front = (rb->front + 1) % rb->size;
flockfile(stdout);
print_buffer(rb);
printf(" <- %zu\n", item);
funlockfile(stdout);
}
/**
* Take an item from a given buffer
*/
static size_t
take_item_from_buffer(struct ring_buffer *rb)
{
size_t item = rb->buffer[rb->rear];
rb->buffer[rb->rear] = 0;
rb->rear = (rb->rear + 1) % rb->size;
flockfile(stdout);
print_buffer(rb);
printf(" -> %zu\n", item);
funlockfile(stdout);
return item;
}
int
main(int argc, char **argv)
{
srand(time(NULL)); // seed RNG
// Parse arguments
if (argc < 3) {
usage(argv[0]);
}
size_t producer_count;
if (sscanf(argv[1], "%zu", &producer_count) != 1) {
usage(argv[0]);
}
size_t consumer_count;
if (sscanf(argv[2], "%zu", &consumer_count) != 1) {
usage(argv[0]);
}
size_t buffer_size;
if (argc < 4 || sscanf(argv[3], "%zu", &buffer_size) != 1) {
buffer_size = 10;
}
pthread_t thread; // we don't need the tids
// Initialize targ
struct thread_arg *targ = malloc(sizeof(*targ)); // one arg for all
if (!targ) {
fprintf(stderr, "Failed to allocate memory\n");
exit(EXIT_FAILURE);
}
// Initialize buffer
targ->rbuffer.buffer = calloc(buffer_size, sizeof(*targ->rbuffer.buffer));
if (!targ->rbuffer.buffer) {
fprintf(stderr, "Failed to allocate memory\n");
exit(EXIT_FAILURE);
}
targ->rbuffer.front = 0;
targ->rbuffer.rear = 0;
targ->rbuffer.size = buffer_size;
if (pthread_mutex_init(&targ->buffer_mutex, NULL) != 0
|| sem_init(&targ->empty_count, 0, buffer_size) < 0
|| sem_init(&targ->filled_count, 0, 0) < 0) {
fprintf(stderr, "Failed to initialize AlP4/5 stuff\n");
exit(EXIT_FAILURE);
}
// Spawn consumers
for (size_t i = 0; i < consumer_count; ++i) {
if (pthread_create(&thread, NULL, consumer, targ) != 0) {
fprintf(stderr, "Failed to spawn thread\n");
exit(EXIT_FAILURE);
}
}
// Spawn producers
for (size_t i = 0; i < producer_count; ++i) {
if (pthread_create(&thread, NULL, producer, targ) != 0) {
fprintf(stderr, "Failed to spawn thread\n");
exit(EXIT_FAILURE);
}
}
pthread_join(thread, NULL); // wait for the last thread, infinitely
}
// Some simple translations
#define procedure static void *
#define BUFFER_SIZE 0
#define mutex int
#define semaphore int
#define produceItem produce_item
// These need to be macro functions to consume the empty ()
#define producer() producer(void *arg)
#define consumer() consumer(void *arg)
// put_item_into_buffer/take_item_from_buffer take the buffer
#define putItemIntoBuffer(x) put_item_into_buffer(x, &targ->rbuffer)
#define removeItemFromBuffer() take_item_from_buffer(&targ->rbuffer)
// Wikipedia uses down and up for both mutexes and semaphores
// First idea was to re#define emptyCount etc. instead,
// but those are both used with down() and with up(),
// so I instead translate into a second layer of macros
// to do the specific calls directly.
#define down(x) down_##x
#define up(x) up_##x
#define down_buffer_mutex pthread_mutex_lock(&targ->buffer_mutex)
#define down_fillCount sem_wait(&targ->filled_count)
#define up_buffer_mutex pthread_mutex_unlock(&targ->buffer_mutex)
#define up_emptyCount sem_post(&targ->empty_count)
// Since we're not allowed to wait, we throw away computed values if the buffer is full
#define down_emptyCount if (sem_trywait(&targ->empty_count) < 0) printf("Buffer full. Throwing away...\n"); else {
// Surpress warnings by adding a return after both while loops
// (just for warnings)
#define consumeItem(x) consume_item(x); } pthread_exit(NULL); {
// up_fillCount needs to contain the closing } for down_emptyCount's else
#define up_fillCount sem_post(&targ->filled_count); }} pthread_exit(NULL); {
// We need to define a few variables. Easiest way to archieve this,
// is to redefine while and add the declarations before that.
#define while if (__LINE__ == 14) printf("New producer\n"); \
else printf("New consumer\n"); \
size_t item; struct thread_arg *targ = arg; while
#include "_"
/*
* Taken from
* https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem#Using_semaphores
*/
mutex buffer_mutex; // similar to "semaphore buffer_mutex = 1", but different (see notes below)
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;
procedure producer()
{
while (true)
{
item = produceItem();
down(emptyCount);
down(buffer_mutex);
putItemIntoBuffer(item);
up(buffer_mutex);
up(fillCount);
}
}
procedure consumer()
{
while (true)
{
down(fillCount);
down(buffer_mutex);
item = removeItemFromBuffer();
up(buffer_mutex);
up(emptyCount);
consumeItem(item);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment