Skip to content

Instantly share code, notes, and snippets.

@avalanche123
Last active May 16, 2018 15:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save avalanche123/98cdbc79feb0feaea33f to your computer and use it in GitHub Desktop.
Save avalanche123/98cdbc79feb0feaea33f to your computer and use it in GitHub Desktop.
uv_promise_t proposal

Introduction

I'm working on a C library that features a background io thread and I think that libuv is a great choice for this task. The io thread will be responsible for handling a pool of tcp connections. The library will be used to write higher-level language bindings and will expose both tradition and asynchronous apis.

I know that a proper way to communicate with a libuv event loop from a different thread is by using uv_async_t object. However, there is, as far as I can tell, no primitive for communication with some other thread(s) from the event loop thread. For this exact reason, I propose a new primitiv - uv_promise_t. And I've included description of its api in this gist.

The goal is to make building apis like Cassandra Java Driver on top of libuv a breeze.

/*
* example demonstrating promises' usage, doesn't actually run as it relies on non-existent uv_channel_t and uv_promise_t,
* is missing some headers and is here for api usage demonstration purposes only
*/
typedef struct driver_s driver_t;
typedef struct request_s request_t;
void io_thread(void* arg);
struct driver_s {
uv_thread_t* io_tid;
uv_channel_t* pipe;
};
driver_t*
driver_new()
{
driver_t* self = calloc(1, sizeof(driver_t));
uv_channel_t* pipe = calloc(1, sizeof(uv_channel_t));
ASSERT(self);
ASSERT(pipe);
self->pipe = pipe;
ASSERT(uv_thread_create(self->io_tid, io_thread, pipe) == 0);
return self;
}
uv_promise_t*
driver_execute(driver_t* self, const char* statement)
{
uv_promise_t* promise = calloc(1, sizeof(uv_promise_t));
request_t* request = calloc(1, sizeof(request_t));
ASSERT(promise);
ASSERT(request);
uv_promise_init(promise);
request->statement = statement;
request->promise = promise;
uv_channel_send(self->pipe, request);
return promise;
}
void
driver_destroy(driver_t** self_p)
{
ASSERT(self_p)
if (*self_p) {
driver_t* self = *self_p;
uv_handle_close(self->pipe, free);
free(self);
*self_p = NULL;
}
}
void
on_timeout(uv_timer_t* handle)
{
ASSERT(handle);
(uv_promise_t*) promise = handle->data;
(int*) result = malloc(sizeof(int));
ASSERT(result);
*result = 5;
uv_promise_fulfil(promise, result);
}
struct request_s {
const char* statement;
uv_promise_t* promise;
};
void
handle_request(uv_channel_t* chan, void* arg)
{
request_t* request = (request_t*) arg;
printf(stdout, "io thread received request for statement %s",
(char*) request->statement);
// sleep for 3 seconds
uv_timer_t* timeout = calloc(1, sizeof(uv_timer_t));
ASSERT(timeout);
// promise will be resolved inside timeout handler
timeout->data = request->promise;
uv_timer_init(uv_loop_default(), timeout);
uv_timer_start(timeout, on_timeout, 3000, 0);
}
int main(int argc, char const *argv[])
{
driver_t* driver = driver_new();
promise_t* future = driver_execute("SELECT * FROM table");
promise_wait(future);
fprintf(stderr, "query finished\n");
promise_result_t result = promise_result(future);
if (result.status == UV_PROMISE_FULFILLED)
fprintf(stderr, "query returned %d\n", (int) *result.result);
else
fprintf(stderr, "query failed\n");
driver_destroy(&driver);
return 0;
}
void
io_thread(void* arg)
{
ASSERT(arg);
uv_loop_t* loop = uv_loop_default();
uv_channel_t* pipe = (uv_channel_t*) arg;
uv_channel_init(loop, pipe, handle_request);
uv_loop_run(loop);
}
typedef enum {
UV_PROMISE_BROKEN = -1,
UV_PROMISE_FULFILLED
} uv_promise_status;
typedef struct {
uv_promise_status status;
void* result;
} uv_promise_result_t;
typedef struct uv_promise_s uv_promise_t;
typedef struct uv_promise_result_s uv_promise_result_t;
typedef void (*uv_promise_cb)(uv_promise_t* self,
uv_promise_status status,
void* result);
/*
* uv_promise_t is a subclass of uv_handle_t
*
* uv_promise_t is the opposite of uv_async_t, and is used to send
* notifications to other theads from the event loop thread.
*/
struct uv_promise_s {
UV_HANDLE_FIELDS
uv_promise_cb promise_cb;
uv_promise_status status;
void* result;
UV_PROMISE_PRIVATE_FIELDS
};
UV_EXTERN int uv_promise_init(uv_loop_t*,
uv_promise_t* handle,
uv_promise_cb callback);
/*
* uv_promise_fulfil is used to fulfil a promise from inside the event loop
*
* returns 0 on success, UV_EINVAL if the promise has aleardy been fulfilled
* or broken.
*/
UV_EXTERN int uv_promise_fulfil(uv_promise_t* handle, void* result);
/*
* uv_promise_break is used to break a promise from inside the event loop
*
* returns 0 on success, UV_EINVAL if the promise has aleardy been fulfilled
* or broken.
*/
UV_EXTERN int uv_promise_break(uv_promise_t* handle, int status);
/*
* uv_promise_wait will block until a promise has been fulfilled or broken.
* uv_promise_result can be called after to obtain a uv_promise_result_t.
*/
UV_EXTERN void uv_promise_wait(uv_promise_t* handle);
/*
* same as uv_promise_wait, but returns EAGAIN if promise is not yet ready.
*/
UV_EXTERN int uv_promise_trywait(uv_promise_t* handle);
/*
* returns result of promise execution if one is available.
* for use after a successful call to uv_promise_wait or uv_promise_trywait.
*/
UV_EXTERN uv_promise_result_t uv_promise_result(uv_promise_t* handle);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment