Skip to content

Instantly share code, notes, and snippets.

@leiless
Created July 11, 2020 07:55
Show Gist options
  • Save leiless/0a33314cd5ea046a5841ac3287620642 to your computer and use it in GitHub Desktop.
Save leiless/0a33314cd5ea046a5841ac3287620642 to your computer and use it in GitHub Desktop.
libuv pipe IPC echo client/server example
/*
* Created Jul 4, 2020.
*/
#include <unistd.h>
#include <uv.h>
#include "test.h"
#include "utils.h"
#include "assertf.h"
#include "uv_utils.h"
#define NR_THREAD 3
#define IPC_PATH "/tmp/.uv.ipc.sock"
static uv_pipe_t *ipc_server = &(uv_pipe_t){};
static uv_thread_t threads[NR_THREAD];
#define READ_BUFSZ 1024
static void ipc_read_cb(uv_poll_t *handle, int status, int events)
{
if (events & UV_DISCONNECT) {
LOG_WARN("Connection reset by IPC server");
int e = uv_poll_stop(handle);
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e));
/*
* In order to safely close the loop, you need to uv_close() the
* uv_pipe_t while the loop is active (before uv_run() returns)
* see: https://github.com/libuv/help/issues/156
*/
uv_close((uv_handle_t *) handle, NULL);
/* handle->data is the `ipc_client' in thread_cb() */
uv_close((uv_handle_t *) handle->data, NULL);
return;
}
if (status < 0) {
LOG_ERR("uv poll error: %s", UVE(status));
return;
}
int fd;
int e = uv_fileno((uv_handle_t *) handle, &fd);
assert_eq(e, 0, "%d");
static char buffer[READ_BUFSZ];
ssize_t nread;
out_EINTR:
nread = read(fd, buffer, sizeof(buffer));
if (nread < 0) {
if (errno == EINTR) goto out_EINTR;
LOG_ERR("read(2) fail fd: %d errno: %d", fd, errno);
} else if (nread == 0) {
LOG_ERR("Connection reset by peer");
} else {
(void) fprintf(stderr, "%.*s", (int) nread, buffer);
}
}
static void ipc_client_connect_cb(uv_connect_t *req, int status)
{
if (status < 0) {
LOG_ERR("uv_connect() fail uv error: %s", UVE(status));
return;
}
int fd;
int e = uv_fileno((uv_handle_t *) req->handle, &fd);
assert_eq(e, 0, "%d");
poll_ctx_t *p = req->data;
/* req->handle->loop is the `thread_loop' */
e = uv_util_poll_init_start(p, req->handle->loop, fd, UV_READABLE | UV_DISCONNECT, ipc_read_cb);
assert_eq(e, 0, "%d");
}
static void thread_cb(void *arg)
{
assert_null(arg);
uv_loop_t *thread_loop = &(uv_loop_t){};
int e = uv_loop_init(thread_loop);
assert_eqf(e, 0, "%d", "uv_loop_init() fail uv error: %s", UVE(e));
uv_pipe_t *ipc_client = &(uv_pipe_t){};
e = uv_pipe_init(thread_loop, ipc_client, 0);
assert_eq(e, 0, "%d");
poll_ctx_t p;
p.handle.data = ipc_client;
uv_connect_t *conn = &(uv_connect_t){};
conn->data = &p;
uv_pipe_connect(conn, ipc_client, IPC_PATH, ipc_client_connect_cb);
e = uv_run(thread_loop, UV_RUN_DEFAULT);
assert_eq(e, 0, "%d");
e = uv_loop_close(thread_loop);
assert_eqf(e, 0, "%d", "uv_loop_close() fail error: %s", UVE(e));
}
static void thread_init(void)
{
int e;
LOG("Waiting IPC clients...");
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
e = uv_thread_create(&threads[i], thread_cb, NULL);
assert_eqf(e, 0, "%d", " uv_thread_create() fail uv error: %s", UVE(e));
}
}
static void thread_join(void)
{
int e;
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
e = uv_thread_join(&threads[i]);
assert_eqf(e, 0, "%d", "uv_thread_join() fail uv error: %s", UVE(e));
}
}
/* Round robin index */
static uint32_t rr_index = 0;
static uv_pipe_t ipc_client[NR_THREAD];
#define USE_STDIO 0
#if USE_STDIO
static ssize_t fdgets(char *s, size_t size, int fd)
{
char c;
size_t nread = 0;
ssize_t n = 0;
while (nread + 1 < size && (n = read(fd, &c, 1)) > 0) {
s[nread] = c;
nread += n;
if (c == '\n') break;
}
if (size) s[nread] = '\0';
return n >= 0 ? (ssize_t) nread : n;
}
/*
* Called in main thread
*/
static void stdin_read_cb(uv_poll_t *handle, int status, int events)
{
if (status < 0) {
LOG_ERR("uv poll error: %s", UVE(status));
return;
}
assert_eq(events, UV_READABLE, "%#x");
int fd;
int e = uv_fileno((uv_handle_t *) handle, &fd);
assert_eq(e, 0, "%d");
assert_eq(fd, STDIN_FILENO, "%d");
static char buffer[4096];
ssize_t n = fdgets(buffer, sizeof(buffer), fd);
if (n > 0) {
static uv_write_t write_req;
uv_buf_t buf = uv_buf_init(buffer, n);
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
if (++rr_index == NR_THREAD) {
rr_index = 0;
}
e = uv_write(&write_req, stream, &buf, 1, NULL);
if (e != 0) {
if (e == UV_EPIPE) {
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD);
} else {
panicf("uv_write() fail uv error: %s", UVE(e));
}
}
} else if (n == 0) {
LOG_DBG("Met EOF in stdin");
e = uv_poll_stop(handle);
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e));
uv_close((uv_handle_t *) handle, NULL);
for (size_t i = 0; i < ARRAY_SIZE(ipc_client); i++) {
uv_close((uv_handle_t *) &ipc_client[i], NULL);
}
uv_close((uv_handle_t *) ipc_server, NULL);
} else {
/* Simply ignore if there is any error */
}
}
#else
static void ipc_echo(void)
{
static uv_write_t write_req;
char *buffer = "+";
size_t n = strlen(buffer);
uv_buf_t buf = uv_buf_init(buffer, n);
while (1) {
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
if (++rr_index == NR_THREAD) {
rr_index = 0;
}
int e = uv_write(&write_req, stream, &buf, 1, NULL);
if (e != 0) {
if (e == UV_EPIPE) {
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD);
} else {
panicf("uv_write() fail uv error: %s", UVE(e));
}
}
/* XXX: In case of IPC client send too soon(need confirmation)? */
ms_sleep(1);
}
}
#endif
/**
* Called in main thread
*/
static void ipc_listen_cb(uv_stream_t *server, int status)
{
static size_t i = 0;
assert_lt(i, NR_THREAD, "%zu");
assert_eq(server, ipc_server, "%p");
if (status < 0) {
LOG_ERR("uv listen error: %s", UVE(status));
return;
}
int e = uv_pipe_init(server->loop, &ipc_client[i], 0);
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e));
e = uv_accept(server, (uv_stream_t *) &ipc_client[i]);
assert_eqf(e, 0, "%d", "uv_accept() fail uv error: %s", UVE(e));
if (++i == NR_THREAD) {
#if USE_STDIO
LOG("All IPC clients connected, ready to poll stdin input...");
static poll_ctx_t poll_ctx;
e = uv_util_poll_init_start(&poll_ctx, server->loop, STDIN_FILENO, UV_READABLE, stdin_read_cb);
assert_eqf(e, 0, "%d", "uv_util_poll_init_start() fail uv error: %s", UVE(e));
#else
LOG("All IPC clients connected");
ipc_echo();
#endif
}
}
int test_main(void)
{
LOG("%s", uv_version_string());
uv_loop_t *main_loop = uv_default_loop();
assert_nonnull(main_loop);
int e = uv_pipe_init(main_loop, ipc_server, 0);
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e));
uv_fs_t *fs = &(uv_fs_t){};
e = uv_fs_unlink(main_loop, fs, IPC_PATH, NULL);
if (e != 0 && e != UV_ENOENT) {
panicf("uv_fs_unlink() fail uv error: %s", UVE(e));
}
e = uv_pipe_bind(ipc_server, IPC_PATH);
assert_eqf(e, 0, "%d", "uv_pipe_bind() fail uv error: %s", UVE(e));
e = uv_listen((uv_stream_t *) ipc_server, SOMAXCONN, ipc_listen_cb);
assert_eqf(e, 0, "%d", "uv_listen() fail uv error: %s", UVE(e));
thread_init();
/*
* [sic]
* Returns non-zero if uv_stop() was called and there are still active handles or requests.
* Returns zero in all other cases.
*/
e = uv_run(main_loop, UV_RUN_DEFAULT);
assert_eqf(e, 0, "%d", "uv_run() fail uv error: %s", UVE(e));
LOG("Main thread main_loop terminated, join threads...");
thread_join();
e = uv_loop_close(main_loop);
assert_eqf(e, 0, "%d", "uv_loop_close() fail uv error: %s", UVE(e));
#if UV_VER_REQ(>=, 1, 38, 0)
/* see: libuv/test/task.h#MAKE_VALGRIND_HAPPY() */
uv_library_shutdown();
#endif
return 0;
}
@leiless
Copy link
Author

leiless commented Jul 11, 2020

XXX: Above code snippet isn't complete, it cannot compile!
Used in libuv/help#158

@leiless
Copy link
Author

leiless commented Jul 12, 2020

Updated uv_pipe_ipc_echo.c, which can work properly, tested under Ubuntu 18.04, 20.04 LTS

/*
 * Created Jul 4, 2020.
 */

#include <unistd.h>
#include <uv.h>

#include "test.h"
#include "utils.h"
#include "assertf.h"
#include "uv_utils.h"

#define NR_THREAD           3
#define IPC_SOCK_PATH       "/tmp/.uv.ipc.sock"

static uv_pipe_t *ipc_server = &(uv_pipe_t){};
static uv_thread_t threads[NR_THREAD];

static void ipc_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
    UNUSED(handle);
    static char slab[65536];
    assert_le(suggested_size, sizeof(slab), "%zu");
    buf->base = slab;
    buf->len = sizeof(slab);
}

static void ipc_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
    if (nread < 0) {
        LOG_WARN("IPC client read met EOF");
        assert_eq(nread, UV_EOF, "%zd");
        /* stream is the `ipc_client' */
        uv_close((uv_handle_t *) stream, NULL);
    } else {
        (void) fprintf(stderr, "%.*s", (int) nread, buf->base);
    }
}

static void ipc_client_connect_cb(uv_connect_t *req, int status)
{
    if (status < 0) {
        LOG_ERR("uv_connect() fail  path: %s uv error: %s", IPC_SOCK_PATH, UVE(status));
        /* req->handle is the `ipc_client' */
        uv_close((uv_handle_t *) req->handle, NULL);
        return;
    }

    /*
     * Don't use uv_poll_start() here
     * see:
     *  https://github.com/libuv/help/issues/158
     *  https://github.com/libuv/libuv/issues/2387
     *  https://github.com/libuv/libuv/pull/2686
     * keywords: loop->watchers[w->fd] == w
     *
     * req->handle is the `ipc_client' in thread_cb()
     */
    int e = uv_read_start(req->handle, ipc_alloc_cb, ipc_read_cb);
    assert_eqf(e, 0, "%d", "uv_read_start() fail  uv error: %s", UVE(e));
}

static void thread_cb(void *arg)
{
    assert_null(arg);

    uv_loop_t *thread_loop = &(uv_loop_t){};
    int e = uv_loop_init(thread_loop);
    assert_eqf(e, 0, "%d", "uv_loop_init() fail  uv error: %s", UVE(e));

    uv_pipe_t *ipc_client = &(uv_pipe_t){};
    e = uv_pipe_init(thread_loop, ipc_client, 0);
    assert_eq(e, 0, "%d");

    uv_connect_t conn;
    uv_pipe_connect(&conn, ipc_client, IPC_SOCK_PATH, ipc_client_connect_cb);

    e = uv_run(thread_loop, UV_RUN_DEFAULT);
    assert_eq(e, 0, "%d");

    e = uv_loop_close(thread_loop);
    assert_eqf(e, 0, "%d", "uv_loop_close() fail  error: %s", UVE(e));
}

static void thread_init(void)
{
    int e;
    LOG("Waiting IPC clients...");
    for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
        e = uv_thread_create(&threads[i], thread_cb, NULL);
        assert_eqf(e, 0, "%d", " uv_thread_create() fail  uv error: %s", UVE(e));
    }
}

static void thread_join(void)
{
    int e;
    for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
        e = uv_thread_join(&threads[i]);
        assert_eqf(e, 0, "%d", "uv_thread_join() fail  uv error: %s", UVE(e));
    }
}

/* Round robin index */
static uint32_t rr_index = 0;
static uv_pipe_t ipc_client[NR_THREAD];

#define USE_STDIO       0

#if USE_STDIO
static ssize_t fdgets(char *s, size_t size, int fd)
{
    char c;
    size_t nread = 0;
    ssize_t n = 0;
    while (nread + 1 < size && (n = read(fd, &c, 1)) > 0) {
        s[nread] = c;
        nread += n;
        if (c == '\n') break;
    }
    if (size) s[nread] = '\0';
    return n >= 0 ? (ssize_t) nread : n;
}

/*
 * Called in main thread
 */
static void stdin_read_cb(uv_poll_t *handle, int status, int events)
{
    if (status < 0) {
        LOG_ERR("uv poll error: %s", UVE(status));
        return;
    }
    assert_eq(events, UV_READABLE, "%#x");

    int fd;
    int e = uv_fileno((uv_handle_t *) handle, &fd);
    assert_eq(e, 0, "%d");
    assert_eq(fd, STDIN_FILENO, "%d");
    static char buffer[4096];
    ssize_t n = fdgets(buffer, sizeof(buffer), fd);
    if (n > 0) {
        static uv_write_t write_req;
        uv_buf_t buf = uv_buf_init(buffer, n);
        uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
        if (++rr_index == NR_THREAD) {
            rr_index = 0;
        }
        e = uv_write(&write_req, stream, &buf, 1, NULL);
        if (e != 0) {
            if (e == UV_EPIPE) {
                LOG_ERR("Broken pipe  i: %u", (rr_index ? rr_index : NR_THREAD) - 1);
            } else {
                panicf("uv_write() fail  uv error: %s", UVE(e));
            }
        }
    } else if (n == 0) {
        LOG_DBG("Met EOF in stdin");
        e = uv_poll_stop(handle);
        assert_eqf(e, 0, "%d", "uv_poll_stop() fail  uv error: %s", UVE(e));
        uv_close((uv_handle_t *) handle, NULL);

        for (size_t i = 0; i < ARRAY_SIZE(ipc_client); i++) {
            uv_close((uv_handle_t *) &ipc_client[i], NULL);
        }
        uv_close((uv_handle_t *) ipc_server, NULL);
    } else {
        /* Simply ignore if there is any error */
    }
}
#else
static void ipc_echo(void)
{
    static uv_write_t write_req;
    char *buffer = "+";
    size_t n = strlen(buffer);
    uv_buf_t buf = uv_buf_init(buffer, n);

    while (1) {
        uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
        if (++rr_index == NR_THREAD) {
            rr_index = 0;
        }

        int e = uv_write(&write_req, stream, &buf, 1, NULL);
        if (e != 0) {
            if (e == UV_EPIPE) {
                LOG_ERR("Broken pipe  i: %u", (rr_index ? rr_index : NR_THREAD) - 1);
            } else {
                panicf("uv_write() fail  uv error: %s", UVE(e));
            }
        }

        /* XXX: In case of IPC client send too soon(need confirmation)? */
        ms_sleep(1);
    }
}
#endif

/**
 * Called in main thread
 */
static void ipc_listen_cb(uv_stream_t *server, int status)
{
    static size_t i = 0;
    assert_lt(i, NR_THREAD, "%zu");
    assert_eq(server, ipc_server, "%p");

    if (status < 0) {
        LOG_ERR("uv listen error: %s", UVE(status));
        return;
    }

    int e = uv_pipe_init(server->loop, &ipc_client[i], 0);
    assert_eqf(e, 0, "%d", "uv_pipe_init() fail  uv error: %s", UVE(e));
    e = uv_accept(server, (uv_stream_t *) &ipc_client[i]);
    assert_eqf(e, 0, "%d", "uv_accept() fail  uv error: %s", UVE(e));

    if (++i == NR_THREAD) {
#if USE_STDIO
        LOG("All IPC clients connected, ready to poll stdin input...");

        static poll_ctx_t poll_ctx;
        e = uv_util_poll_init_start(&poll_ctx, server->loop, STDIN_FILENO, UV_READABLE, stdin_read_cb);
        assert_eqf(e, 0, "%d", "uv_util_poll_init_start() fail  uv error: %s", UVE(e));
#else
        LOG("All IPC clients connected");
        ipc_echo();
#endif
    }
}

static void ipc_server_init(uv_loop_t *loop, uv_fs_t *fs)
{
    int e = uv_pipe_init(loop, ipc_server, 0);
    assert_eqf(e, 0, "%d", "uv_pipe_init() fail  uv error: %s", UVE(e));

    e = uv_fs_unlink(loop, fs, IPC_SOCK_PATH, NULL);
    if (e != 0 && e != UV_ENOENT) {
        panicf("uv_fs_unlink() fail  uv error: %s", UVE(e));
    }

    e = uv_pipe_bind(ipc_server, IPC_SOCK_PATH);
    assert_eqf(e, 0, "%d", "uv_pipe_bind() fail  uv error: %s", UVE(e));

    e = uv_listen((uv_stream_t *) ipc_server, SOMAXCONN, ipc_listen_cb);
    assert_eqf(e, 0, "%d", "uv_listen() fail  uv error: %s", UVE(e));
}

int test_main(void)
{
    LOG("%s", uv_version_string());

    uv_loop_t *main_loop = uv_default_loop();
    assert_nonnull(main_loop);

    uv_fs_t fs;
    ipc_server_init(main_loop, &fs);

    thread_init();

    /*
     * [sic]
     * Returns non-zero if uv_stop() was called and there are still active handles or requests.
     * Returns zero in all other cases.
     */
    int e = uv_run(main_loop, UV_RUN_DEFAULT);
    assert_eqf(e, 0, "%d", "uv_run() fail  uv error: %s", UVE(e));

    LOG("Main thread main_loop terminated, join threads...");
    thread_join();

    e = uv_loop_close(main_loop);
    assert_eqf(e, 0, "%d", "uv_loop_close() fail  uv error: %s", UVE(e));

#if UV_VER_REQ(>=, 1, 38, 0)
    /* see: libuv/test/task.h#MAKE_VALGRIND_HAPPY() */
    uv_library_shutdown();
#endif

    return 0;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment