Skip to content

Instantly share code, notes, and snippets.

@mohan43u
Last active November 23, 2020 15:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mohan43u/304a8befb1097a44203cbc51221e13bf to your computer and use it in GitHub Desktop.
Save mohan43u/304a8befb1097a44203cbc51221e13bf to your computer and use it in GitHub Desktop.
example of single producer multiple consumer in C
/* compile: gcc -pthread -o spmc spmc.c */
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
struct _simple {
int id;
struct _simple *next;
};
typedef struct _simple simple;
simple *queue = NULL;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int stopthreads = 0;
static void enqueue(simple *newobj) {
if(queue == NULL) {
queue = newobj;
return;
}
simple *queue_dup = queue;
while(queue_dup->next != NULL) {
queue_dup = queue_dup->next;
}
queue_dup->next = newobj;
}
static simple* dequeue() {
simple *ret = NULL;
if(queue == NULL) {
return ret;
}
ret = queue;
queue = queue->next;
return ret;
}
static int queuelen() {
int i = 0;
simple *queue_dup = queue;
while(queue_dup != NULL) {
i++;
queue_dup = queue_dup->next;
}
return i;
}
static void queuefree() {
while(queue) {
printf("queuefree: queuelen=%d, obj=%p, id=%d\n", queuelen(), queue, queue->id);
simple *queue_dup = queue;
queue = queue->next;
if(queue_dup != NULL) free(queue_dup);
}
}
static void consumer_func(void) {
while(stopthreads != 1) {
int result = -1;
simple *obj = NULL;
sleep(1);
if((result = pthread_mutex_trylock(&mutex)) != 0) {
if(result == EBUSY) {
continue;
}
else {
perror("consumer");
return;
}
}
obj = dequeue();
if(obj != NULL) {
printf("consuemer: thread=%p, queuelen=%d, obj=%p, id=%d\n", (void *) pthread_self(), queuelen(), obj, obj ? obj->id : -1);
free(obj);
}
if((result = pthread_mutex_unlock(&mutex)) != 0) {
perror("consumer");
}
}
printf("consumer[%p]: exit\n", (void *)pthread_self());
}
static void producer_func(void) {
while(stopthreads != 1) {
int result = -1;
simple *obj = NULL;
sleep(1);
if((result = pthread_mutex_trylock(&mutex)) != 0) {
if(result == EBUSY) {
continue;
}
else {
perror("producer");
return;
}
}
obj = calloc(1, sizeof(simple));
if(obj == NULL) {
perror("producer");
goto end;
}
obj->id = rand() % 100;
enqueue(obj);
printf("producer: thread=%p, queuelen=%d, obj=%p, id=%d\n", (void *) pthread_self(), queuelen(), obj, obj->id);
end:
if((result = pthread_mutex_unlock(&mutex)) != 0) {
perror("consumer");
}
}
printf("producer[%p]: exit\n", (void *)pthread_self());
}
static void handle_signal(int interruptno) {
stopthreads = 1;
}
int main(int argc, char *argv[]) {
pthread_attr_t attr;
pthread_t producer = -1;
pthread_t consumer[10] = {};
int i = 0;
if(signal(SIGINT, (sighandler_t) handle_signal) != 0) {
perror("signal");
return 1;
}
if(pthread_attr_init(&attr) != 0) {
perror("attr_init");
return 1;
}
if(pthread_create(&producer, &attr, (void*(*)(void*))producer_func, NULL) != 0) {
perror("producer:pthread_create");
return 1;
}
for(i = 0; i < 10; i++) {
if(pthread_create(&consumer[i], &attr, (void*(*)(void*))consumer_func, NULL) != 0) {
perror("consumer:pthread_create");
return 1;
}
}
if(pthread_join(producer, NULL) != 0) {
perror("producer:join");
return 1;
}
for(i = 0; i < 10; i++) {
if(pthread_join(consumer[i], NULL) != 0) {
perror("producer:join");
return 1;
}
}
queuefree();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment