Created
July 11, 2020 07:55
-
-
Save leiless/0a33314cd5ea046a5841ac3287620642 to your computer and use it in GitHub Desktop.
libuv pipe IPC echo client/server example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Created Jul 4, 2020. | |
*/ | |
#include <unistd.h> | |
#include <uv.h> | |
#include "test.h" | |
#include "utils.h" | |
#include "assertf.h" | |
#include "uv_utils.h" | |
#define NR_THREAD 3 | |
#define IPC_PATH "/tmp/.uv.ipc.sock" | |
static uv_pipe_t *ipc_server = &(uv_pipe_t){}; | |
static uv_thread_t threads[NR_THREAD]; | |
#define READ_BUFSZ 1024 | |
static void ipc_read_cb(uv_poll_t *handle, int status, int events) | |
{ | |
if (events & UV_DISCONNECT) { | |
LOG_WARN("Connection reset by IPC server"); | |
int e = uv_poll_stop(handle); | |
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e)); | |
/* | |
* In order to safely close the loop, you need to uv_close() the | |
* uv_pipe_t while the loop is active (before uv_run() returns) | |
* see: https://github.com/libuv/help/issues/156 | |
*/ | |
uv_close((uv_handle_t *) handle, NULL); | |
/* handle->data is the `ipc_client' in thread_cb() */ | |
uv_close((uv_handle_t *) handle->data, NULL); | |
return; | |
} | |
if (status < 0) { | |
LOG_ERR("uv poll error: %s", UVE(status)); | |
return; | |
} | |
int fd; | |
int e = uv_fileno((uv_handle_t *) handle, &fd); | |
assert_eq(e, 0, "%d"); | |
static char buffer[READ_BUFSZ]; | |
ssize_t nread; | |
out_EINTR: | |
nread = read(fd, buffer, sizeof(buffer)); | |
if (nread < 0) { | |
if (errno == EINTR) goto out_EINTR; | |
LOG_ERR("read(2) fail fd: %d errno: %d", fd, errno); | |
} else if (nread == 0) { | |
LOG_ERR("Connection reset by peer"); | |
} else { | |
(void) fprintf(stderr, "%.*s", (int) nread, buffer); | |
} | |
} | |
static void ipc_client_connect_cb(uv_connect_t *req, int status) | |
{ | |
if (status < 0) { | |
LOG_ERR("uv_connect() fail uv error: %s", UVE(status)); | |
return; | |
} | |
int fd; | |
int e = uv_fileno((uv_handle_t *) req->handle, &fd); | |
assert_eq(e, 0, "%d"); | |
poll_ctx_t *p = req->data; | |
/* req->handle->loop is the `thread_loop' */ | |
e = uv_util_poll_init_start(p, req->handle->loop, fd, UV_READABLE | UV_DISCONNECT, ipc_read_cb); | |
assert_eq(e, 0, "%d"); | |
} | |
static void thread_cb(void *arg) | |
{ | |
assert_null(arg); | |
uv_loop_t *thread_loop = &(uv_loop_t){}; | |
int e = uv_loop_init(thread_loop); | |
assert_eqf(e, 0, "%d", "uv_loop_init() fail uv error: %s", UVE(e)); | |
uv_pipe_t *ipc_client = &(uv_pipe_t){}; | |
e = uv_pipe_init(thread_loop, ipc_client, 0); | |
assert_eq(e, 0, "%d"); | |
poll_ctx_t p; | |
p.handle.data = ipc_client; | |
uv_connect_t *conn = &(uv_connect_t){}; | |
conn->data = &p; | |
uv_pipe_connect(conn, ipc_client, IPC_PATH, ipc_client_connect_cb); | |
e = uv_run(thread_loop, UV_RUN_DEFAULT); | |
assert_eq(e, 0, "%d"); | |
e = uv_loop_close(thread_loop); | |
assert_eqf(e, 0, "%d", "uv_loop_close() fail error: %s", UVE(e)); | |
} | |
static void thread_init(void) | |
{ | |
int e; | |
LOG("Waiting IPC clients..."); | |
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) { | |
e = uv_thread_create(&threads[i], thread_cb, NULL); | |
assert_eqf(e, 0, "%d", " uv_thread_create() fail uv error: %s", UVE(e)); | |
} | |
} | |
static void thread_join(void) | |
{ | |
int e; | |
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) { | |
e = uv_thread_join(&threads[i]); | |
assert_eqf(e, 0, "%d", "uv_thread_join() fail uv error: %s", UVE(e)); | |
} | |
} | |
/* Round robin index */ | |
static uint32_t rr_index = 0; | |
static uv_pipe_t ipc_client[NR_THREAD]; | |
#define USE_STDIO 0 | |
#if USE_STDIO | |
static ssize_t fdgets(char *s, size_t size, int fd) | |
{ | |
char c; | |
size_t nread = 0; | |
ssize_t n = 0; | |
while (nread + 1 < size && (n = read(fd, &c, 1)) > 0) { | |
s[nread] = c; | |
nread += n; | |
if (c == '\n') break; | |
} | |
if (size) s[nread] = '\0'; | |
return n >= 0 ? (ssize_t) nread : n; | |
} | |
/* | |
* Called in main thread | |
*/ | |
static void stdin_read_cb(uv_poll_t *handle, int status, int events) | |
{ | |
if (status < 0) { | |
LOG_ERR("uv poll error: %s", UVE(status)); | |
return; | |
} | |
assert_eq(events, UV_READABLE, "%#x"); | |
int fd; | |
int e = uv_fileno((uv_handle_t *) handle, &fd); | |
assert_eq(e, 0, "%d"); | |
assert_eq(fd, STDIN_FILENO, "%d"); | |
static char buffer[4096]; | |
ssize_t n = fdgets(buffer, sizeof(buffer), fd); | |
if (n > 0) { | |
static uv_write_t write_req; | |
uv_buf_t buf = uv_buf_init(buffer, n); | |
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index]; | |
if (++rr_index == NR_THREAD) { | |
rr_index = 0; | |
} | |
e = uv_write(&write_req, stream, &buf, 1, NULL); | |
if (e != 0) { | |
if (e == UV_EPIPE) { | |
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD); | |
} else { | |
panicf("uv_write() fail uv error: %s", UVE(e)); | |
} | |
} | |
} else if (n == 0) { | |
LOG_DBG("Met EOF in stdin"); | |
e = uv_poll_stop(handle); | |
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e)); | |
uv_close((uv_handle_t *) handle, NULL); | |
for (size_t i = 0; i < ARRAY_SIZE(ipc_client); i++) { | |
uv_close((uv_handle_t *) &ipc_client[i], NULL); | |
} | |
uv_close((uv_handle_t *) ipc_server, NULL); | |
} else { | |
/* Simply ignore if there is any error */ | |
} | |
} | |
#else | |
static void ipc_echo(void) | |
{ | |
static uv_write_t write_req; | |
char *buffer = "+"; | |
size_t n = strlen(buffer); | |
uv_buf_t buf = uv_buf_init(buffer, n); | |
while (1) { | |
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index]; | |
if (++rr_index == NR_THREAD) { | |
rr_index = 0; | |
} | |
int e = uv_write(&write_req, stream, &buf, 1, NULL); | |
if (e != 0) { | |
if (e == UV_EPIPE) { | |
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD); | |
} else { | |
panicf("uv_write() fail uv error: %s", UVE(e)); | |
} | |
} | |
/* XXX: In case of IPC client send too soon(need confirmation)? */ | |
ms_sleep(1); | |
} | |
} | |
#endif | |
/** | |
* Called in main thread | |
*/ | |
static void ipc_listen_cb(uv_stream_t *server, int status) | |
{ | |
static size_t i = 0; | |
assert_lt(i, NR_THREAD, "%zu"); | |
assert_eq(server, ipc_server, "%p"); | |
if (status < 0) { | |
LOG_ERR("uv listen error: %s", UVE(status)); | |
return; | |
} | |
int e = uv_pipe_init(server->loop, &ipc_client[i], 0); | |
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e)); | |
e = uv_accept(server, (uv_stream_t *) &ipc_client[i]); | |
assert_eqf(e, 0, "%d", "uv_accept() fail uv error: %s", UVE(e)); | |
if (++i == NR_THREAD) { | |
#if USE_STDIO | |
LOG("All IPC clients connected, ready to poll stdin input..."); | |
static poll_ctx_t poll_ctx; | |
e = uv_util_poll_init_start(&poll_ctx, server->loop, STDIN_FILENO, UV_READABLE, stdin_read_cb); | |
assert_eqf(e, 0, "%d", "uv_util_poll_init_start() fail uv error: %s", UVE(e)); | |
#else | |
LOG("All IPC clients connected"); | |
ipc_echo(); | |
#endif | |
} | |
} | |
int test_main(void) | |
{ | |
LOG("%s", uv_version_string()); | |
uv_loop_t *main_loop = uv_default_loop(); | |
assert_nonnull(main_loop); | |
int e = uv_pipe_init(main_loop, ipc_server, 0); | |
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e)); | |
uv_fs_t *fs = &(uv_fs_t){}; | |
e = uv_fs_unlink(main_loop, fs, IPC_PATH, NULL); | |
if (e != 0 && e != UV_ENOENT) { | |
panicf("uv_fs_unlink() fail uv error: %s", UVE(e)); | |
} | |
e = uv_pipe_bind(ipc_server, IPC_PATH); | |
assert_eqf(e, 0, "%d", "uv_pipe_bind() fail uv error: %s", UVE(e)); | |
e = uv_listen((uv_stream_t *) ipc_server, SOMAXCONN, ipc_listen_cb); | |
assert_eqf(e, 0, "%d", "uv_listen() fail uv error: %s", UVE(e)); | |
thread_init(); | |
/* | |
* [sic] | |
* Returns non-zero if uv_stop() was called and there are still active handles or requests. | |
* Returns zero in all other cases. | |
*/ | |
e = uv_run(main_loop, UV_RUN_DEFAULT); | |
assert_eqf(e, 0, "%d", "uv_run() fail uv error: %s", UVE(e)); | |
LOG("Main thread main_loop terminated, join threads..."); | |
thread_join(); | |
e = uv_loop_close(main_loop); | |
assert_eqf(e, 0, "%d", "uv_loop_close() fail uv error: %s", UVE(e)); | |
#if UV_VER_REQ(>=, 1, 38, 0) | |
/* see: libuv/test/task.h#MAKE_VALGRIND_HAPPY() */ | |
uv_library_shutdown(); | |
#endif | |
return 0; | |
} | |
Updated uv_pipe_ipc_echo.c
, which can work properly, tested under Ubuntu 18.04, 20.04 LTS
/*
* Created Jul 4, 2020.
*/
#include <unistd.h>
#include <uv.h>
#include "test.h"
#include "utils.h"
#include "assertf.h"
#include "uv_utils.h"
#define NR_THREAD 3
#define IPC_SOCK_PATH "/tmp/.uv.ipc.sock"
static uv_pipe_t *ipc_server = &(uv_pipe_t){};
static uv_thread_t threads[NR_THREAD];
static void ipc_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
UNUSED(handle);
static char slab[65536];
assert_le(suggested_size, sizeof(slab), "%zu");
buf->base = slab;
buf->len = sizeof(slab);
}
static void ipc_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
if (nread < 0) {
LOG_WARN("IPC client read met EOF");
assert_eq(nread, UV_EOF, "%zd");
/* stream is the `ipc_client' */
uv_close((uv_handle_t *) stream, NULL);
} else {
(void) fprintf(stderr, "%.*s", (int) nread, buf->base);
}
}
static void ipc_client_connect_cb(uv_connect_t *req, int status)
{
if (status < 0) {
LOG_ERR("uv_connect() fail path: %s uv error: %s", IPC_SOCK_PATH, UVE(status));
/* req->handle is the `ipc_client' */
uv_close((uv_handle_t *) req->handle, NULL);
return;
}
/*
* Don't use uv_poll_start() here
* see:
* https://github.com/libuv/help/issues/158
* https://github.com/libuv/libuv/issues/2387
* https://github.com/libuv/libuv/pull/2686
* keywords: loop->watchers[w->fd] == w
*
* req->handle is the `ipc_client' in thread_cb()
*/
int e = uv_read_start(req->handle, ipc_alloc_cb, ipc_read_cb);
assert_eqf(e, 0, "%d", "uv_read_start() fail uv error: %s", UVE(e));
}
static void thread_cb(void *arg)
{
assert_null(arg);
uv_loop_t *thread_loop = &(uv_loop_t){};
int e = uv_loop_init(thread_loop);
assert_eqf(e, 0, "%d", "uv_loop_init() fail uv error: %s", UVE(e));
uv_pipe_t *ipc_client = &(uv_pipe_t){};
e = uv_pipe_init(thread_loop, ipc_client, 0);
assert_eq(e, 0, "%d");
uv_connect_t conn;
uv_pipe_connect(&conn, ipc_client, IPC_SOCK_PATH, ipc_client_connect_cb);
e = uv_run(thread_loop, UV_RUN_DEFAULT);
assert_eq(e, 0, "%d");
e = uv_loop_close(thread_loop);
assert_eqf(e, 0, "%d", "uv_loop_close() fail error: %s", UVE(e));
}
static void thread_init(void)
{
int e;
LOG("Waiting IPC clients...");
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
e = uv_thread_create(&threads[i], thread_cb, NULL);
assert_eqf(e, 0, "%d", " uv_thread_create() fail uv error: %s", UVE(e));
}
}
static void thread_join(void)
{
int e;
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
e = uv_thread_join(&threads[i]);
assert_eqf(e, 0, "%d", "uv_thread_join() fail uv error: %s", UVE(e));
}
}
/* Round robin index */
static uint32_t rr_index = 0;
static uv_pipe_t ipc_client[NR_THREAD];
#define USE_STDIO 0
#if USE_STDIO
static ssize_t fdgets(char *s, size_t size, int fd)
{
char c;
size_t nread = 0;
ssize_t n = 0;
while (nread + 1 < size && (n = read(fd, &c, 1)) > 0) {
s[nread] = c;
nread += n;
if (c == '\n') break;
}
if (size) s[nread] = '\0';
return n >= 0 ? (ssize_t) nread : n;
}
/*
* Called in main thread
*/
static void stdin_read_cb(uv_poll_t *handle, int status, int events)
{
if (status < 0) {
LOG_ERR("uv poll error: %s", UVE(status));
return;
}
assert_eq(events, UV_READABLE, "%#x");
int fd;
int e = uv_fileno((uv_handle_t *) handle, &fd);
assert_eq(e, 0, "%d");
assert_eq(fd, STDIN_FILENO, "%d");
static char buffer[4096];
ssize_t n = fdgets(buffer, sizeof(buffer), fd);
if (n > 0) {
static uv_write_t write_req;
uv_buf_t buf = uv_buf_init(buffer, n);
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
if (++rr_index == NR_THREAD) {
rr_index = 0;
}
e = uv_write(&write_req, stream, &buf, 1, NULL);
if (e != 0) {
if (e == UV_EPIPE) {
LOG_ERR("Broken pipe i: %u", (rr_index ? rr_index : NR_THREAD) - 1);
} else {
panicf("uv_write() fail uv error: %s", UVE(e));
}
}
} else if (n == 0) {
LOG_DBG("Met EOF in stdin");
e = uv_poll_stop(handle);
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e));
uv_close((uv_handle_t *) handle, NULL);
for (size_t i = 0; i < ARRAY_SIZE(ipc_client); i++) {
uv_close((uv_handle_t *) &ipc_client[i], NULL);
}
uv_close((uv_handle_t *) ipc_server, NULL);
} else {
/* Simply ignore if there is any error */
}
}
#else
static void ipc_echo(void)
{
static uv_write_t write_req;
char *buffer = "+";
size_t n = strlen(buffer);
uv_buf_t buf = uv_buf_init(buffer, n);
while (1) {
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
if (++rr_index == NR_THREAD) {
rr_index = 0;
}
int e = uv_write(&write_req, stream, &buf, 1, NULL);
if (e != 0) {
if (e == UV_EPIPE) {
LOG_ERR("Broken pipe i: %u", (rr_index ? rr_index : NR_THREAD) - 1);
} else {
panicf("uv_write() fail uv error: %s", UVE(e));
}
}
/* XXX: In case of IPC client send too soon(need confirmation)? */
ms_sleep(1);
}
}
#endif
/**
* Called in main thread
*/
static void ipc_listen_cb(uv_stream_t *server, int status)
{
static size_t i = 0;
assert_lt(i, NR_THREAD, "%zu");
assert_eq(server, ipc_server, "%p");
if (status < 0) {
LOG_ERR("uv listen error: %s", UVE(status));
return;
}
int e = uv_pipe_init(server->loop, &ipc_client[i], 0);
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e));
e = uv_accept(server, (uv_stream_t *) &ipc_client[i]);
assert_eqf(e, 0, "%d", "uv_accept() fail uv error: %s", UVE(e));
if (++i == NR_THREAD) {
#if USE_STDIO
LOG("All IPC clients connected, ready to poll stdin input...");
static poll_ctx_t poll_ctx;
e = uv_util_poll_init_start(&poll_ctx, server->loop, STDIN_FILENO, UV_READABLE, stdin_read_cb);
assert_eqf(e, 0, "%d", "uv_util_poll_init_start() fail uv error: %s", UVE(e));
#else
LOG("All IPC clients connected");
ipc_echo();
#endif
}
}
static void ipc_server_init(uv_loop_t *loop, uv_fs_t *fs)
{
int e = uv_pipe_init(loop, ipc_server, 0);
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e));
e = uv_fs_unlink(loop, fs, IPC_SOCK_PATH, NULL);
if (e != 0 && e != UV_ENOENT) {
panicf("uv_fs_unlink() fail uv error: %s", UVE(e));
}
e = uv_pipe_bind(ipc_server, IPC_SOCK_PATH);
assert_eqf(e, 0, "%d", "uv_pipe_bind() fail uv error: %s", UVE(e));
e = uv_listen((uv_stream_t *) ipc_server, SOMAXCONN, ipc_listen_cb);
assert_eqf(e, 0, "%d", "uv_listen() fail uv error: %s", UVE(e));
}
int test_main(void)
{
LOG("%s", uv_version_string());
uv_loop_t *main_loop = uv_default_loop();
assert_nonnull(main_loop);
uv_fs_t fs;
ipc_server_init(main_loop, &fs);
thread_init();
/*
* [sic]
* Returns non-zero if uv_stop() was called and there are still active handles or requests.
* Returns zero in all other cases.
*/
int e = uv_run(main_loop, UV_RUN_DEFAULT);
assert_eqf(e, 0, "%d", "uv_run() fail uv error: %s", UVE(e));
LOG("Main thread main_loop terminated, join threads...");
thread_join();
e = uv_loop_close(main_loop);
assert_eqf(e, 0, "%d", "uv_loop_close() fail uv error: %s", UVE(e));
#if UV_VER_REQ(>=, 1, 38, 0)
/* see: libuv/test/task.h#MAKE_VALGRIND_HAPPY() */
uv_library_shutdown();
#endif
return 0;
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
XXX: Above code snippet isn't complete, it cannot compile!
Used in libuv/help#158