Created
November 8, 2016 20:56
-
-
Save lxr/04ad44f4c12c4591ec9aabedac11fb27 to your computer and use it in GitHub Desktop.
libthread-style channels in pthreads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* pthread_chan implements Plan 9 libthread-style channels using | |
* pure pthreads. Compared to other C channel libraries I've seen, | |
* this one has the distinction of supporting blocking select and | |
* deferred cancellation, albeit at a cost: all select operations are | |
* serialized using a single process-wide lock. This lock contention | |
* percolates even to ordinary sends and receives, which are implemented | |
* using pthread_chan_select. This is unfortunate, but it's still the | |
* cheapest way to implement blocking select in pthreads, as one can | |
* only wait on a single condition variable-mutex pair at a time; the | |
* alternative would be to kick off a dedicated wait thread for each | |
* select operation. | |
* | |
* The send/recv/select interface also has the hangup that users are | |
* required to provide their own condition variables. This, too, is | |
* necessary in order to make the functions impossible to fail due to | |
* allocation failures. | |
* | |
* pthread_chan requires C99 to compile, mostly due to the syntactic | |
* sugar it enables. | |
* | |
* Copyright (c) 2016 Lari Rasku. This code is released to the public | |
* domain, or under CC0 if not applicable. | |
*/ | |
#include <errno.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include "pthread_chan.h" | |
enum pthread_selop { | |
PTHREAD_SELECT_NOOP, | |
PTHREAD_SELECT_CLOSE, | |
PTHREAD_SELECT_BUF, | |
PTHREAD_SELECT_NOBUF, | |
}; | |
struct pthread_selectargs { | |
size_t n; | |
pthread_select_t *cases; | |
}; | |
/* | |
* The process-wide select lock. pthread_chan assumes that static | |
* initialization of this lock always succeeds, which is not true of all | |
* POSIX threads implementations; but such implementations are buggy, or | |
* at the very least ignorant of the feature's intended use [1]. | |
* | |
* [1]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html#tag_16_437_08_04 | |
*/ | |
static pthread_mutex_t pthread_chan_lock = PTHREAD_MUTEX_INITIALIZER; | |
static void pthread_select_cleanup(void *); | |
static void pthread_select_close(pthread_select_t *, int); | |
static enum pthread_selop pthread_select_canexec(pthread_select_t *); | |
static void pthread_select_exec(pthread_select_t *, enum pthread_selop); | |
static void pthread_chan_bufexec(pthread_chan_t *, enum pthread_chanop, void *); | |
static pthread_select_t *pthread_chan_drainq(pthread_chan_t *, enum pthread_chanop); | |
/* | |
* pthread_chan_init initializes a channel of nmemb elements of size | |
* bytes. If nmemb is zero, the channel is unbuffered. A size of zero | |
* is legal and useful when only synchronization is necessary. | |
* pthread_chan_init returns zero on success and ENOMEM on failure. | |
* Initializing an unbuffered channel cannot fail. | |
*/ | |
int | |
pthread_chan_init(pthread_chan_t *chan, size_t size, size_t nmemb) | |
{ | |
chan->err = 0; | |
chan->wid = size; | |
TAILQ_INIT(&chan->q[0]); | |
TAILQ_INIT(&chan->q[1]); | |
chan->buf = calloc(nmemb, size); | |
if (chan->buf == NULL && nmemb > 0) | |
return errno; | |
chan->off = chan->len = 0; | |
chan->cap = nmemb; | |
return 0; | |
} | |
/* | |
* pthread_chan_destroy frees the resources allocated for a channel. | |
* It always returns zero. | |
*/ | |
int | |
pthread_chan_destroy(pthread_chan_t *chan) | |
{ | |
free(chan->buf); | |
return 0; | |
} | |
/* | |
* pthread_chan_send sends the element pointed to by val over chan, | |
* blocking until chan has room to receive the element. It returns | |
* zero if the element was successfully sent, a positive error code | |
* if chan was or got closed, and a negative error code if the send | |
* failed. Sending can fail for any of the reasons pthread_cond_wait(3) | |
* can fail on your system. | |
* | |
* pthread_chan_send is a cancellation point. | |
*/ | |
int | |
pthread_chan_send(pthread_chan_t *chan, const void *val, pthread_cond_t *cond) | |
{ | |
pthread_select_t send = { PTHREAD_CHAN_SEND, chan, (void *)val }; | |
ssize_t err; | |
err = pthread_chan_select(&send, 1, cond); | |
return err == 0 ? send.ret : -err; | |
} | |
/* | |
* pthread_chan_recv waits for an element to arrive over chan and stores | |
* it in the memory pointed to by val. It returns zero if an element | |
* was successfully received, a positive error code if chan was or got | |
* closed, and a negative error code if the reception failed. Receiving | |
* can fail for any of the reasons pthread_cond_wait(3) can fail on your | |
* system. | |
* | |
* pthread_chan_recv is a cancellation point. | |
*/ | |
int | |
pthread_chan_recv(pthread_chan_t *chan, void *val, pthread_cond_t *cond) | |
{ | |
pthread_select_t recv = { PTHREAD_CHAN_RECV, chan, val }; | |
ssize_t err; | |
err = pthread_chan_select(&recv, 1, cond); | |
return err == 0 ? recv.ret : -err; | |
} | |
/* | |
* pthread_chan_close closes chan. All pending sends and recvs, and all | |
* future sends, will fail with value err. If chan is buffered, any | |
* elements in the buffer remain there until chan is destroyed or until | |
* they are read by pthread_chan_recv. pthread_chan_close returns | |
* zero on success, EPIPE if the channel is already closed, and EINVAL | |
* if err == 0. | |
*/ | |
int | |
pthread_chan_close(pthread_chan_t *chan, int err) | |
{ | |
pthread_select_t *s; | |
if (err == 0) | |
return EINVAL; | |
pthread_mutex_lock(&pthread_chan_lock); | |
if (chan->err != 0) { | |
err = EPIPE; | |
goto out; | |
} | |
chan->err = err; | |
while ((s = TAILQ_FIRST(&chan->q[0]))) { | |
TAILQ_REMOVE(&chan->q[0], s, waitq); | |
s->queued = 0; | |
pthread_select_close(s, err); | |
} | |
while ((s = TAILQ_FIRST(&chan->q[1]))) { | |
TAILQ_REMOVE(&chan->q[1], s, waitq); | |
s->queued = 0; | |
pthread_select_close(s, err); | |
} | |
err = 0; | |
out: | |
pthread_mutex_unlock(&pthread_chan_lock); | |
return err; | |
} | |
/* | |
* pthread_chan_select blocks until one of the n select cases can | |
* proceed, executes it, and returns its index. If several cases can | |
* proceed when pthread_chan_select is entered, one is chosen for | |
* execution by uniform random selection. pthread_chan_select returns | |
* a negative error code on failure; it can fail for any of the reasons | |
* your system's pthread_cond_wait(3) can fail. | |
* | |
* pthread_chan_select is a cancellation point. | |
*/ | |
ssize_t | |
pthread_chan_select(pthread_select_t *cases, size_t n, pthread_cond_t *cond) | |
{ | |
pthread_select_t *s, *choice = NULL; | |
size_t i, c = 0; | |
enum pthread_selop op, opchoice; | |
int err = 0; | |
pthread_mutex_lock(&pthread_chan_lock); | |
/* | |
* If any operations can be executed without blocking, take a | |
* uniform random selection of them, and execute and return the | |
* choice. | |
*/ | |
for (i = 0; i < n; i++) | |
if ((op = pthread_select_canexec(&cases[i]))) | |
if (rand() % ++c == 0) { | |
choice = &cases[i]; | |
opchoice = op; | |
} | |
if (c > 0) { | |
pthread_select_exec(choice, opchoice); | |
pthread_mutex_unlock(&pthread_chan_lock); | |
return choice - cases; | |
} | |
/* | |
* Otherwise, mark the operations as belonging to this select | |
* call and add them to their channels' respective queues. | |
* Wait in a loop until choice has been set. | |
*/ | |
for (i = 0; i < n; i++) { | |
s = &cases[i]; | |
s->cond = cond; | |
s->choice = &choice; | |
TAILQ_INSERT_TAIL(&s->chan->q[s->op], s, waitq); | |
s->queued = 1; | |
} | |
pthread_cleanup_push(pthread_select_cleanup, | |
(&(struct pthread_selectargs){ n, cases })); | |
while (choice == NULL && err == 0) | |
err = pthread_cond_wait(cond, &pthread_chan_lock); | |
pthread_cleanup_pop(1); | |
return err == 0 ? choice - cases : -err; | |
} | |
static void | |
pthread_select_cleanup(void *arg) | |
{ | |
struct pthread_selectargs *args = arg; | |
pthread_select_t *s; | |
size_t i; | |
for (i = 0; i < args->n; i++) { | |
s = &args->cases[i]; | |
if (!s->queued) | |
continue; | |
TAILQ_REMOVE(&s->chan->q[s->op], s, waitq); | |
} | |
pthread_mutex_unlock(&pthread_chan_lock); | |
} | |
static void | |
pthread_select_close(pthread_select_t *s, int err) | |
{ | |
s->ret = err; | |
if (*s->choice == NULL) { | |
*s->choice = s; | |
pthread_cond_signal(s->cond); | |
} | |
} | |
static enum pthread_selop | |
pthread_select_canexec(pthread_select_t *s) | |
{ | |
pthread_chan_t *chan = s->chan; | |
pthread_select_t *s2; | |
if (chan->err != 0 && (chan->len == 0 || s->op == PTHREAD_CHAN_SEND)) | |
return PTHREAD_SELECT_CLOSE; | |
if (chan->len > 0 && s->op == PTHREAD_CHAN_RECV) | |
return PTHREAD_SELECT_BUF; | |
if (chan->len < chan->cap && s->op == PTHREAD_CHAN_SEND) | |
return PTHREAD_SELECT_BUF; | |
s2 = pthread_chan_drainq(chan, !s->op); | |
if (s2 == NULL) | |
return PTHREAD_SELECT_NOOP; | |
TAILQ_INSERT_HEAD(&chan->q[!s->op], s2, waitq); | |
s2->queued = 1; | |
return PTHREAD_SELECT_NOBUF; | |
} | |
static void | |
pthread_select_exec(pthread_select_t *s, enum pthread_selop op) | |
{ | |
pthread_chan_t *chan = s->chan; | |
pthread_select_t *s2; | |
void *buf[2]; | |
switch (op) { | |
case PTHREAD_SELECT_NOOP: | |
/* NOTREACHED */ | |
case PTHREAD_SELECT_CLOSE: | |
s->ret = chan->err; | |
break; | |
case PTHREAD_SELECT_BUF: | |
pthread_chan_bufexec(chan, s->op, s->val); | |
s->ret = 0; | |
if ((s2 = pthread_chan_drainq(chan, !s->op))) { | |
pthread_chan_bufexec(chan, !s->op, s2->val); | |
pthread_select_close(s2, 0); | |
} | |
break; | |
case PTHREAD_SELECT_NOBUF: | |
s2 = pthread_chan_drainq(chan, !s->op); | |
buf[s->op] = s->val; | |
buf[s2->op] = s2->val; | |
memcpy(buf[PTHREAD_CHAN_RECV], buf[PTHREAD_CHAN_SEND], chan->wid); | |
s->ret = 0; | |
pthread_select_close(s2, 0); | |
break; | |
} | |
} | |
static void | |
pthread_chan_bufexec(pthread_chan_t *chan, enum pthread_chanop op, void *val) | |
{ | |
void *ptr; | |
switch (op) { | |
case PTHREAD_CHAN_SEND: | |
ptr = &chan->buf[(chan->off+chan->len)%chan->cap*chan->wid]; | |
memcpy(ptr, val, chan->wid); | |
chan->len++; | |
break; | |
case PTHREAD_CHAN_RECV: | |
ptr = &chan->buf[chan->off*chan->wid]; | |
memcpy(val, ptr, chan->wid); | |
chan->len--; | |
chan->off = (chan->off + 1) % chan->cap; | |
break; | |
} | |
} | |
static pthread_select_t * | |
pthread_chan_drainq(pthread_chan_t *chan, enum pthread_chanop op) | |
{ | |
pthread_select_t *s; | |
dequeue: | |
s = TAILQ_FIRST(&chan->q[op]); | |
if (s == NULL) | |
return NULL; | |
TAILQ_REMOVE(&chan->q[op], s, waitq); | |
s->queued = 0; | |
if (*s->choice != NULL) | |
goto dequeue; | |
return s; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef _PTHREAD_CHAN_H_ | |
#define _PTHREAD_CHAN_H_ | |
#include <sys/types.h> | |
#include <sys/queue.h> | |
#include <pthread.h> | |
typedef struct pthread_chan pthread_chan_t; | |
typedef struct pthread_select pthread_select_t; | |
enum pthread_chanop { | |
PTHREAD_CHAN_RECV = 0, | |
PTHREAD_CHAN_SEND = 1, | |
}; | |
struct pthread_chan { | |
int err; /* error state */ | |
size_t wid; /* size of channel values */ | |
TAILQ_HEAD(, pthread_select) q[2]; /* pending sends/recvs */ | |
char *buf; | |
size_t off, len, cap; | |
}; | |
struct pthread_select { | |
unsigned int op:1; /* op type */ | |
pthread_chan_t *chan; /* target channel */ | |
void *val; /* pointer to value to send/recv */ | |
int ret; /* return value */ | |
/* internal use fields; do not touch these */ | |
pthread_cond_t *cond; | |
pthread_select_t **choice; | |
TAILQ_ENTRY(pthread_select) waitq; | |
int queued; | |
}; | |
__BEGIN_DECLS | |
int pthread_chan_init(pthread_chan_t *, size_t, size_t); | |
int pthread_chan_destroy(pthread_chan_t *); | |
int pthread_chan_send(pthread_chan_t *, const void *, pthread_cond_t *); | |
int pthread_chan_recv(pthread_chan_t *, void *, pthread_cond_t *); | |
int pthread_chan_close(pthread_chan_t *, int); | |
ssize_t pthread_chan_select(pthread_select_t *, size_t, pthread_cond_t *); | |
__END_DECLS | |
#endif /* _PTHREAD_CHAN_H_ */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment