Skip to content

Instantly share code, notes, and snippets.

@vwood
Created November 2, 2010 15:00
Show Gist options
  • Save vwood/659733 to your computer and use it in GitHub Desktop.
Save vwood/659733 to your computer and use it in GitHub Desktop.
Channel library.
#include <semaphore.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "channel.h"
struct channel_s {
sem_t empty_semaphore;
sem_t fill_semaphore;
pthread_mutex_t mutex;
char *buffer;
int buffer_length;
size_t object_size;
int insert_point;
int remove_point;
};
void channel_init(channel_t **c, int buffer_length, size_t object_size) {
int error;
*c = malloc(sizeof **c);
if (*c == NULL) {
fprintf(stderr, "channel allocation error\n");
return;
}
(*c)->buffer_length = buffer_length;
(*c)->object_size = object_size;
(*c)->buffer = malloc(object_size * buffer_length);
(*c)->insert_point = 0;
(*c)->remove_point = 0;
error = pthread_mutex_init(&c->mutex, NULL);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
// Semaphores ensure there are items/space to read/write
sem_init(&(*c)->empty_semaphore, 0, buffer_length);
sem_init(&(*c)->fill_semaphore, 0, 0);
}
void channel_add(channel_t *c, void *item) {
int error;
sem_wait(&c->empty_semaphore); // Reserve an empty space
error = pthread_mutex_lock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
memcpy(c->buffer + c->insert_point, (const void *) item, c->object_size);
c->insert_point = (c->insert_point + 1) % c->buffer_length;
error = pthread_mutex_unlock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
sem_post(&c->fill_semaphore); // Increase number of items
}
void channel_get(channel_t *c, void *space) {
int error;
sem_wait(&c->fill_semaphore); // Wait for an item
error = pthread_mutex_lock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
memcpy(space, c->buffer + c->remove_point, c->object_size);
c->remove_point = (c->remove_point + 1) % c->buffer_length;
error = pthread_mutex_unlock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
sem_post(&c->empty_semaphore); // Free the space
}
/*
Items still in buffer are forfeit
May cause memory to leak if pointers are involved.
*/
void channel_destroy(channel_t *c) {
int error;
free(c->buffer);
sem_destroy(&c->empty_semaphore);
sem_destroy(&c->fill_semaphore);
error = pthread_mutex_destroy(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
free(c);
}
#ifndef CHANNEL_H
#define CHANNEL_H
#include <stddef.h>
typedef struct channel_s channel_t;
void channel_init(channel_t **, int buffer_length, size_t object_size);
void channel_add(channel_t *, void *item);
void channel_get(channel_t *, void *space);
void channel_destroy(channel_t *);
#endif /* CHANNEL_H */
@vwood
Copy link
Author

vwood commented Nov 25, 2010

@vwood
Copy link
Author

vwood commented May 9, 2012

TODO:

  • Add reference counting to destroy and something else to increase count
  • be atomic with regards to destruction. register_consumer, deregister_consumer, open_channel, close_channel
  • also, cf: http://concurrencykit.org/doc/ck_ring.html
  • Probably don't limit the queue length to the semaphore count. Maybe I don't need the semaphore (replace with a condition variable to block on)
  • Careful about false sharing
  • Improve error reporting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment