Skip to content

Instantly share code, notes, and snippets.

@vwood
Forked from vwood/channel.c
Created November 17, 2010 03:33
Show Gist options
  • Save vwood/702933 to your computer and use it in GitHub Desktop.
Save vwood/702933 to your computer and use it in GitHub Desktop.
Channels with asynchronous support.
#include <semaphore.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "channel.h"
struct channel_s {
sem_t empty_semaphore;
sem_t fill_semaphore;
pthread_mutex_t mutex;
char *buffer;
int buffer_length;
size_t object_size;
int insert_point;
int remove_point;
};
void channel_init(channel_t **c, int buffer_length, size_t object_size) {
int error;
*c = malloc(sizeof **c);
if (*c == NULL) {
fprintf(stderr, "channel allocation error\n");
return;
}
(*c)->buffer_length = buffer_length;
(*c)->object_size = object_size;
(*c)->buffer = malloc(object_size * buffer_length);
(*c)->insert_point = 0;
(*c)->remove_point = 0;
error = pthread_mutex_init(&c->mutex, NULL);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
// Semaphores ensure there are items/space to read/write
sem_init(&(*c)->empty_semaphore, 0, buffer_length);
sem_init(&(*c)->fill_semaphore, 0, 0);
}
void channel_add(channel_t *c, void *item) {
int error;
sem_wait(&c->empty_semaphore); // Reserve an empty space
error = pthread_mutex_lock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
memcpy(c->buffer + c->insert_point, (const void *) item, c->object_size);
c->insert_point = (c->insert_point + 1) % c->buffer_length;
error = pthread_mutex_unlock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
sem_post(&c->fill_semaphore); // Increase number of items
}
/* Nonblocking, returns 1 on Success, 0 on Failure*/
int channel_tryadd(channel_t *c, void *item) {
int error;
if (sem_trywait(&c->empty_semaphore) != 0) // Attempt to reserve an empty space
return 0;
error = pthread_mutex_lock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
memcpy(c->buffer + c->insert_point, (char *) item, c->object_size);
c->insert_point = (c->insert_point + 1) % c->buffer_length;
error = pthread_mutex_unlock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
sem_post(&c->fill_semaphore); // Increase number of items
return 1;
}
void channel_get(channel_t *c, void *space) {
int error;
sem_wait(&c->fill_semaphore); // Wait for an item
error = pthread_mutex_lock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
memcpy(space, c->buffer + c->remove_point, c->object_size);
c->remove_point = (c->remove_point + 1) % c->buffer_length;
error = pthread_mutex_unlock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
sem_post(&c->empty_semaphore); // Free the space
}
/* Nonblocking, Returns 1 on Success, 0 on Failure */
int channel_tryget(channel_t *c, void *space) {
int error;
if (sem_trywait(&c->fill_semaphore) != 0) // Check for an item
return 0;
error = pthread_mutex_lock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
memcpy(space, c->buffer + c->remove_point, c->object_size);
c->remove_point = (c->remove_point + 1) % c->buffer_length;
error = pthread_mutex_unlock(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
sem_post(&c->empty_semaphore); // Free the space
return 1;
}
/*
Items still in buffer are forfeit
May cause memory to leak if pointers are involved.
*/
void channel_destroy(channel_t *c) {
int error;
free(c->buffer);
sem_destroy(&c->empty_semaphore);
sem_destroy(&c->fill_semaphore);
error = pthread_mutex_destroy(&c->mutex);
if (error != 0)
fprintf(stderr, "channel error:%d\n", error);
free(c);
}
#ifndef CHANNEL_H
#define CHANNEL_H
#include <stddef.h>
typedef struct channel_s channel_t;
void channel_init(channel_t **, int buffer_length, size_t object_size);
void channel_add(channel_t *, void *item);
int channel_tryadd(channel_t *, void *item);
void channel_get(channel_t *, void *space);
int channel_tryget(channel_t *, void *space);
void channel_destroy(channel_t *);
#endif /* CHANNEL_H */
@vwood
Copy link
Author

vwood commented Nov 25, 2010

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment