Skip to content

Instantly share code, notes, and snippets.

@lxr
Created November 8, 2016 20:56
Show Gist options
  • Save lxr/04ad44f4c12c4591ec9aabedac11fb27 to your computer and use it in GitHub Desktop.
Save lxr/04ad44f4c12c4591ec9aabedac11fb27 to your computer and use it in GitHub Desktop.
libthread-style channels in pthreads
/*
* pthread_chan implements Plan 9 libthread-style channels using
* pure pthreads. Compared to other C channel libraries I've seen,
* this one has the distinction of supporting blocking select and
* deferred cancellation, albeit at a cost: all select operations are
* serialized using a single process-wide lock. This lock contention
* percolates even to ordinary sends and receives, which are implemented
* using pthread_chan_select. This is unfortunate, but it's still the
* cheapest way to implement blocking select in pthreads, as one can
* only wait on a single condition variable-mutex pair at a time; the
* alternative would be to kick off a dedicated wait thread for each
* select operation.
*
* The send/recv/select interface also has the hangup that users are
* required to provide their own condition variables. This, too, is
* necessary in order to make the functions impossible to fail due to
* allocation failures.
*
* pthread_chan requires C99 to compile, mostly due to the syntactic
* sugar it enables.
*
* Copyright (c) 2016 Lari Rasku. This code is released to the public
* domain, or under CC0 if not applicable.
*/
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include "pthread_chan.h"
enum pthread_selop {
PTHREAD_SELECT_NOOP,
PTHREAD_SELECT_CLOSE,
PTHREAD_SELECT_BUF,
PTHREAD_SELECT_NOBUF,
};
struct pthread_selectargs {
size_t n;
pthread_select_t *cases;
};
/*
* The process-wide select lock. pthread_chan assumes that static
* initialization of this lock always succeeds, which is not true of all
* POSIX threads implementations; but such implementations are buggy, or
* at the very least ignorant of the feature's intended use [1].
*
* [1]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html#tag_16_437_08_04
*/
static pthread_mutex_t pthread_chan_lock = PTHREAD_MUTEX_INITIALIZER;
static void pthread_select_cleanup(void *);
static void pthread_select_close(pthread_select_t *, int);
static enum pthread_selop pthread_select_canexec(pthread_select_t *);
static void pthread_select_exec(pthread_select_t *, enum pthread_selop);
static void pthread_chan_bufexec(pthread_chan_t *, enum pthread_chanop, void *);
static pthread_select_t *pthread_chan_drainq(pthread_chan_t *, enum pthread_chanop);
/*
* pthread_chan_init initializes a channel of nmemb elements of size
* bytes. If nmemb is zero, the channel is unbuffered. A size of zero
* is legal and useful when only synchronization is necessary.
* pthread_chan_init returns zero on success and ENOMEM on failure.
* Initializing an unbuffered channel cannot fail.
*/
int
pthread_chan_init(pthread_chan_t *chan, size_t size, size_t nmemb)
{
chan->err = 0;
chan->wid = size;
TAILQ_INIT(&chan->q[0]);
TAILQ_INIT(&chan->q[1]);
chan->buf = calloc(nmemb, size);
if (chan->buf == NULL && nmemb > 0)
return errno;
chan->off = chan->len = 0;
chan->cap = nmemb;
return 0;
}
/*
* pthread_chan_destroy frees the resources allocated for a channel.
* It always returns zero.
*/
int
pthread_chan_destroy(pthread_chan_t *chan)
{
free(chan->buf);
return 0;
}
/*
* pthread_chan_send sends the element pointed to by val over chan,
* blocking until chan has room to receive the element. It returns
* zero if the element was successfully sent, a positive error code
* if chan was or got closed, and a negative error code if the send
* failed. Sending can fail for any of the reasons pthread_cond_wait(3)
* can fail on your system.
*
* pthread_chan_send is a cancellation point.
*/
int
pthread_chan_send(pthread_chan_t *chan, const void *val, pthread_cond_t *cond)
{
pthread_select_t send = { PTHREAD_CHAN_SEND, chan, (void *)val };
ssize_t err;
err = pthread_chan_select(&send, 1, cond);
return err == 0 ? send.ret : -err;
}
/*
* pthread_chan_recv waits for an element to arrive over chan and stores
* it in the memory pointed to by val. It returns zero if an element
* was successfully received, a positive error code if chan was or got
* closed, and a negative error code if the reception failed. Receiving
* can fail for any of the reasons pthread_cond_wait(3) can fail on your
* system.
*
* pthread_chan_recv is a cancellation point.
*/
int
pthread_chan_recv(pthread_chan_t *chan, void *val, pthread_cond_t *cond)
{
pthread_select_t recv = { PTHREAD_CHAN_RECV, chan, val };
ssize_t err;
err = pthread_chan_select(&recv, 1, cond);
return err == 0 ? recv.ret : -err;
}
/*
* pthread_chan_close closes chan. All pending sends and recvs, and all
* future sends, will fail with value err. If chan is buffered, any
* elements in the buffer remain there until chan is destroyed or until
* they are read by pthread_chan_recv. pthread_chan_close returns
* zero on success, EPIPE if the channel is already closed, and EINVAL
* if err == 0.
*/
int
pthread_chan_close(pthread_chan_t *chan, int err)
{
pthread_select_t *s;
if (err == 0)
return EINVAL;
pthread_mutex_lock(&pthread_chan_lock);
if (chan->err != 0) {
err = EPIPE;
goto out;
}
chan->err = err;
while ((s = TAILQ_FIRST(&chan->q[0]))) {
TAILQ_REMOVE(&chan->q[0], s, waitq);
s->queued = 0;
pthread_select_close(s, err);
}
while ((s = TAILQ_FIRST(&chan->q[1]))) {
TAILQ_REMOVE(&chan->q[1], s, waitq);
s->queued = 0;
pthread_select_close(s, err);
}
err = 0;
out:
pthread_mutex_unlock(&pthread_chan_lock);
return err;
}
/*
* pthread_chan_select blocks until one of the n select cases can
* proceed, executes it, and returns its index. If several cases can
* proceed when pthread_chan_select is entered, one is chosen for
* execution by uniform random selection. pthread_chan_select returns
* a negative error code on failure; it can fail for any of the reasons
* your system's pthread_cond_wait(3) can fail.
*
* pthread_chan_select is a cancellation point.
*/
ssize_t
pthread_chan_select(pthread_select_t *cases, size_t n, pthread_cond_t *cond)
{
pthread_select_t *s, *choice = NULL;
size_t i, c = 0;
enum pthread_selop op, opchoice;
int err = 0;
pthread_mutex_lock(&pthread_chan_lock);
/*
* If any operations can be executed without blocking, take a
* uniform random selection of them, and execute and return the
* choice.
*/
for (i = 0; i < n; i++)
if ((op = pthread_select_canexec(&cases[i])))
if (rand() % ++c == 0) {
choice = &cases[i];
opchoice = op;
}
if (c > 0) {
pthread_select_exec(choice, opchoice);
pthread_mutex_unlock(&pthread_chan_lock);
return choice - cases;
}
/*
* Otherwise, mark the operations as belonging to this select
* call and add them to their channels' respective queues.
* Wait in a loop until choice has been set.
*/
for (i = 0; i < n; i++) {
s = &cases[i];
s->cond = cond;
s->choice = &choice;
TAILQ_INSERT_TAIL(&s->chan->q[s->op], s, waitq);
s->queued = 1;
}
pthread_cleanup_push(pthread_select_cleanup,
(&(struct pthread_selectargs){ n, cases }));
while (choice == NULL && err == 0)
err = pthread_cond_wait(cond, &pthread_chan_lock);
pthread_cleanup_pop(1);
return err == 0 ? choice - cases : -err;
}
static void
pthread_select_cleanup(void *arg)
{
struct pthread_selectargs *args = arg;
pthread_select_t *s;
size_t i;
for (i = 0; i < args->n; i++) {
s = &args->cases[i];
if (!s->queued)
continue;
TAILQ_REMOVE(&s->chan->q[s->op], s, waitq);
}
pthread_mutex_unlock(&pthread_chan_lock);
}
static void
pthread_select_close(pthread_select_t *s, int err)
{
s->ret = err;
if (*s->choice == NULL) {
*s->choice = s;
pthread_cond_signal(s->cond);
}
}
static enum pthread_selop
pthread_select_canexec(pthread_select_t *s)
{
pthread_chan_t *chan = s->chan;
pthread_select_t *s2;
if (chan->err != 0 && (chan->len == 0 || s->op == PTHREAD_CHAN_SEND))
return PTHREAD_SELECT_CLOSE;
if (chan->len > 0 && s->op == PTHREAD_CHAN_RECV)
return PTHREAD_SELECT_BUF;
if (chan->len < chan->cap && s->op == PTHREAD_CHAN_SEND)
return PTHREAD_SELECT_BUF;
s2 = pthread_chan_drainq(chan, !s->op);
if (s2 == NULL)
return PTHREAD_SELECT_NOOP;
TAILQ_INSERT_HEAD(&chan->q[!s->op], s2, waitq);
s2->queued = 1;
return PTHREAD_SELECT_NOBUF;
}
static void
pthread_select_exec(pthread_select_t *s, enum pthread_selop op)
{
pthread_chan_t *chan = s->chan;
pthread_select_t *s2;
void *buf[2];
switch (op) {
case PTHREAD_SELECT_NOOP:
/* NOTREACHED */
case PTHREAD_SELECT_CLOSE:
s->ret = chan->err;
break;
case PTHREAD_SELECT_BUF:
pthread_chan_bufexec(chan, s->op, s->val);
s->ret = 0;
if ((s2 = pthread_chan_drainq(chan, !s->op))) {
pthread_chan_bufexec(chan, !s->op, s2->val);
pthread_select_close(s2, 0);
}
break;
case PTHREAD_SELECT_NOBUF:
s2 = pthread_chan_drainq(chan, !s->op);
buf[s->op] = s->val;
buf[s2->op] = s2->val;
memcpy(buf[PTHREAD_CHAN_RECV], buf[PTHREAD_CHAN_SEND], chan->wid);
s->ret = 0;
pthread_select_close(s2, 0);
break;
}
}
static void
pthread_chan_bufexec(pthread_chan_t *chan, enum pthread_chanop op, void *val)
{
void *ptr;
switch (op) {
case PTHREAD_CHAN_SEND:
ptr = &chan->buf[(chan->off+chan->len)%chan->cap*chan->wid];
memcpy(ptr, val, chan->wid);
chan->len++;
break;
case PTHREAD_CHAN_RECV:
ptr = &chan->buf[chan->off*chan->wid];
memcpy(val, ptr, chan->wid);
chan->len--;
chan->off = (chan->off + 1) % chan->cap;
break;
}
}
static pthread_select_t *
pthread_chan_drainq(pthread_chan_t *chan, enum pthread_chanop op)
{
pthread_select_t *s;
dequeue:
s = TAILQ_FIRST(&chan->q[op]);
if (s == NULL)
return NULL;
TAILQ_REMOVE(&chan->q[op], s, waitq);
s->queued = 0;
if (*s->choice != NULL)
goto dequeue;
return s;
}
#ifndef _PTHREAD_CHAN_H_
#define _PTHREAD_CHAN_H_
#include <sys/types.h>
#include <sys/queue.h>
#include <pthread.h>
typedef struct pthread_chan pthread_chan_t;
typedef struct pthread_select pthread_select_t;
enum pthread_chanop {
PTHREAD_CHAN_RECV = 0,
PTHREAD_CHAN_SEND = 1,
};
struct pthread_chan {
int err; /* error state */
size_t wid; /* size of channel values */
TAILQ_HEAD(, pthread_select) q[2]; /* pending sends/recvs */
char *buf;
size_t off, len, cap;
};
struct pthread_select {
unsigned int op:1; /* op type */
pthread_chan_t *chan; /* target channel */
void *val; /* pointer to value to send/recv */
int ret; /* return value */
/* internal use fields; do not touch these */
pthread_cond_t *cond;
pthread_select_t **choice;
TAILQ_ENTRY(pthread_select) waitq;
int queued;
};
__BEGIN_DECLS
int pthread_chan_init(pthread_chan_t *, size_t, size_t);
int pthread_chan_destroy(pthread_chan_t *);
int pthread_chan_send(pthread_chan_t *, const void *, pthread_cond_t *);
int pthread_chan_recv(pthread_chan_t *, void *, pthread_cond_t *);
int pthread_chan_close(pthread_chan_t *, int);
ssize_t pthread_chan_select(pthread_select_t *, size_t, pthread_cond_t *);
__END_DECLS
#endif /* _PTHREAD_CHAN_H_ */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment