Created
August 14, 2014 22:46
-
-
Save albertnetymk/8b46bc53c8adfb364be8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <pthread.h> | |
#include <stdlib.h> | |
#include <stdbool.h> | |
#include <unistd.h> | |
#include <sched.h> | |
#define N 100 | |
#define MSG 1000 | |
#define THREAD 10 | |
typedef unsigned int uint; | |
struct stream_struct; | |
typedef struct listobj_struct { | |
struct stream_struct *s; | |
struct listobj_struct *next; | |
} listobj; | |
typedef struct list_struct { | |
listobj *head; | |
listobj *tail; | |
} list; | |
typedef struct fifo_struct { | |
int array[N]; | |
_Atomic uint index; | |
_Atomic bool finished; | |
_Atomic bool ahead; | |
_Atomic bool subscribers_full; | |
uint total_subscriber; | |
uint roundup; | |
pthread_mutex_t roundup_lock; | |
uint n_subscriber; | |
list *subscribers; | |
pthread_mutex_t subscribe_lock; | |
} fifo; | |
typedef struct stream_struct { | |
fifo *buffer; | |
_Atomic uint index; | |
} stream; | |
stream *subscribe(fifo *h); | |
// void unsubscribe(stream *s); | |
int stream_read(stream *s); | |
bool stream_finished(stream *s); | |
void stream_write(fifo *h, int v); | |
fifo *stream_create(uint n); | |
void stream_close(fifo *h); | |
void append_list(list *l, stream *s); | |
bool contains(list *l, uint needle); | |
fifo *handle; | |
void* thread_producer(void * arg) | |
{ | |
for (int i = 0; i < MSG; ++i) { | |
stream_write(handle, i); | |
} | |
stream_close(handle); | |
pthread_exit(NULL); | |
} | |
void* thread_consumer(void * arg) | |
{ | |
stream *s = subscribe(handle); | |
int expect = 0; | |
int v; | |
while(!stream_finished(s)) { | |
v = stream_read(s); | |
if (expect != v) { | |
printf("ERROR ID: %lu received %d but expect %d\n", | |
pthread_self(), v, expect); | |
} else { | |
// printf("OK ID: %lu received %d\n", | |
// pthread_self(), v); | |
} | |
expect++; | |
} | |
pthread_exit(NULL); | |
} | |
int main (void) { | |
handle = stream_create(THREAD); | |
pthread_t producer; | |
if (pthread_create(&producer, NULL, &thread_producer, NULL)) { | |
printf("ERROR in creating pthread\n"); | |
exit(-1); | |
} | |
for (int i = 0; i < THREAD; ++i) { | |
pthread_t consumer; | |
if (pthread_create(&consumer, NULL, &thread_consumer, NULL)) { | |
printf("ERROR in creating pthread\n"); | |
exit(-1); | |
} | |
} | |
pthread_exit(NULL); | |
} | |
fifo *stream_create(uint n) | |
{ | |
fifo *h = malloc(sizeof(fifo)); | |
h->index = 0; | |
h->subscribers = malloc(sizeof(list)); | |
h->subscribers->head = h->subscribers->tail = NULL; | |
h->finished = false; | |
h->total_subscriber = n; | |
h->n_subscriber = 0; | |
h->roundup = 0; | |
h->ahead = true; | |
if (pthread_mutex_init(&h->subscribe_lock, NULL) != 0) { | |
printf("\n mutex init failed\n"); | |
exit(1); | |
} | |
return h; | |
} | |
stream *subscribe(fifo *h) | |
{ | |
stream *s = malloc(sizeof(stream)); | |
s->buffer = h; | |
s->index = 0; | |
pthread_mutex_lock(&h->subscribe_lock); | |
append_list(h->subscribers, s); | |
h->n_subscriber++; | |
if (h->n_subscriber == h->total_subscriber) { | |
h->subscribers_full = true; | |
} | |
pthread_mutex_unlock(&h->subscribe_lock); | |
return s; | |
} | |
bool contains(list *l, uint needle) | |
{ | |
listobj *current = l->head; | |
while(current != NULL) { | |
if (current->s->index == needle) { | |
return true; | |
} | |
current = current->next; | |
} | |
return false; | |
} | |
void stream_write(fifo *h, int v) | |
{ | |
if (h->ahead && h->index < (N - 1)) { | |
// shortcut | |
} else { | |
while (!h->subscribers_full) { | |
sched_yield(); | |
} | |
uint needle = (h->index + 1) % N; | |
while (contains(h->subscribers, needle)) { | |
sched_yield(); | |
} | |
} | |
h->array[h->index] = v; | |
if (h->index + 1 == N) { | |
h->ahead = false; | |
h->index = 0; | |
} else { | |
h->index++; | |
} | |
} | |
int stream_read(stream *s) | |
{ | |
while(s->index == s->buffer->index) { | |
sched_yield(); | |
} | |
int result = s->buffer->array[s->index]; | |
if (s->index + 1 == N) { | |
s->index = 0; | |
pthread_mutex_lock(&s->buffer->roundup_lock); | |
s->buffer->roundup++; | |
if (s->buffer->roundup == s->buffer->total_subscriber) { | |
s->buffer->roundup = 0; | |
s->buffer->ahead = true; | |
} | |
pthread_mutex_unlock(&s->buffer->roundup_lock); | |
} else { | |
s->index++; | |
} | |
return result; | |
} | |
void stream_close(fifo *h) | |
{ | |
h->finished = true; | |
} | |
bool stream_finished(stream *s) | |
{ | |
bool empty = (s->index == s->buffer->index); | |
return empty && s->buffer->finished; | |
} | |
void append_list(list *l, stream *s) | |
{ | |
listobj *o = malloc(sizeof(listobj)); | |
o->s = s; | |
o->next = NULL; | |
if (l->head == NULL) { | |
l->head = l->tail = o; | |
} else { | |
l->tail->next = o; | |
l->tail = o; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment