Skip to content

Instantly share code, notes, and snippets.

@albertnetymk
Created August 14, 2014 22:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save albertnetymk/8b46bc53c8adfb364be8 to your computer and use it in GitHub Desktop.
Save albertnetymk/8b46bc53c8adfb364be8 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <sched.h>
#define N 100
#define MSG 1000
#define THREAD 10
typedef unsigned int uint;
struct stream_struct;
typedef struct listobj_struct {
struct stream_struct *s;
struct listobj_struct *next;
} listobj;
typedef struct list_struct {
listobj *head;
listobj *tail;
} list;
typedef struct fifo_struct {
int array[N];
_Atomic uint index;
_Atomic bool finished;
_Atomic bool ahead;
_Atomic bool subscribers_full;
uint total_subscriber;
uint roundup;
pthread_mutex_t roundup_lock;
uint n_subscriber;
list *subscribers;
pthread_mutex_t subscribe_lock;
} fifo;
typedef struct stream_struct {
fifo *buffer;
_Atomic uint index;
} stream;
stream *subscribe(fifo *h);
// void unsubscribe(stream *s);
int stream_read(stream *s);
bool stream_finished(stream *s);
void stream_write(fifo *h, int v);
fifo *stream_create(uint n);
void stream_close(fifo *h);
void append_list(list *l, stream *s);
bool contains(list *l, uint needle);
fifo *handle;
void* thread_producer(void * arg)
{
for (int i = 0; i < MSG; ++i) {
stream_write(handle, i);
}
stream_close(handle);
pthread_exit(NULL);
}
void* thread_consumer(void * arg)
{
stream *s = subscribe(handle);
int expect = 0;
int v;
while(!stream_finished(s)) {
v = stream_read(s);
if (expect != v) {
printf("ERROR ID: %lu received %d but expect %d\n",
pthread_self(), v, expect);
} else {
// printf("OK ID: %lu received %d\n",
// pthread_self(), v);
}
expect++;
}
pthread_exit(NULL);
}
int main (void) {
handle = stream_create(THREAD);
pthread_t producer;
if (pthread_create(&producer, NULL, &thread_producer, NULL)) {
printf("ERROR in creating pthread\n");
exit(-1);
}
for (int i = 0; i < THREAD; ++i) {
pthread_t consumer;
if (pthread_create(&consumer, NULL, &thread_consumer, NULL)) {
printf("ERROR in creating pthread\n");
exit(-1);
}
}
pthread_exit(NULL);
}
fifo *stream_create(uint n)
{
fifo *h = malloc(sizeof(fifo));
h->index = 0;
h->subscribers = malloc(sizeof(list));
h->subscribers->head = h->subscribers->tail = NULL;
h->finished = false;
h->total_subscriber = n;
h->n_subscriber = 0;
h->roundup = 0;
h->ahead = true;
if (pthread_mutex_init(&h->subscribe_lock, NULL) != 0) {
printf("\n mutex init failed\n");
exit(1);
}
return h;
}
stream *subscribe(fifo *h)
{
stream *s = malloc(sizeof(stream));
s->buffer = h;
s->index = 0;
pthread_mutex_lock(&h->subscribe_lock);
append_list(h->subscribers, s);
h->n_subscriber++;
if (h->n_subscriber == h->total_subscriber) {
h->subscribers_full = true;
}
pthread_mutex_unlock(&h->subscribe_lock);
return s;
}
bool contains(list *l, uint needle)
{
listobj *current = l->head;
while(current != NULL) {
if (current->s->index == needle) {
return true;
}
current = current->next;
}
return false;
}
void stream_write(fifo *h, int v)
{
if (h->ahead && h->index < (N - 1)) {
// shortcut
} else {
while (!h->subscribers_full) {
sched_yield();
}
uint needle = (h->index + 1) % N;
while (contains(h->subscribers, needle)) {
sched_yield();
}
}
h->array[h->index] = v;
if (h->index + 1 == N) {
h->ahead = false;
h->index = 0;
} else {
h->index++;
}
}
int stream_read(stream *s)
{
while(s->index == s->buffer->index) {
sched_yield();
}
int result = s->buffer->array[s->index];
if (s->index + 1 == N) {
s->index = 0;
pthread_mutex_lock(&s->buffer->roundup_lock);
s->buffer->roundup++;
if (s->buffer->roundup == s->buffer->total_subscriber) {
s->buffer->roundup = 0;
s->buffer->ahead = true;
}
pthread_mutex_unlock(&s->buffer->roundup_lock);
} else {
s->index++;
}
return result;
}
void stream_close(fifo *h)
{
h->finished = true;
}
bool stream_finished(stream *s)
{
bool empty = (s->index == s->buffer->index);
return empty && s->buffer->finished;
}
void append_list(list *l, stream *s)
{
listobj *o = malloc(sizeof(listobj));
o->s = s;
o->next = NULL;
if (l->head == NULL) {
l->head = l->tail = o;
} else {
l->tail->next = o;
l->tail = o;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment