Created
July 11, 2020 07:55
-
-
Save leiless/0a33314cd5ea046a5841ac3287620642 to your computer and use it in GitHub Desktop.
libuv pipe IPC echo client/server example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Created Jul 4, 2020. | |
*/ | |
#include <unistd.h> | |
#include <uv.h> | |
#include "test.h" | |
#include "utils.h" | |
#include "assertf.h" | |
#include "uv_utils.h" | |
#define NR_THREAD 3 | |
#define IPC_PATH "/tmp/.uv.ipc.sock" | |
static uv_pipe_t *ipc_server = &(uv_pipe_t){}; | |
static uv_thread_t threads[NR_THREAD]; | |
#define READ_BUFSZ 1024 | |
static void ipc_read_cb(uv_poll_t *handle, int status, int events) | |
{ | |
if (events & UV_DISCONNECT) { | |
LOG_WARN("Connection reset by IPC server"); | |
int e = uv_poll_stop(handle); | |
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e)); | |
/* | |
* In order to safely close the loop, you need to uv_close() the | |
* uv_pipe_t while the loop is active (before uv_run() returns) | |
* see: https://github.com/libuv/help/issues/156 | |
*/ | |
uv_close((uv_handle_t *) handle, NULL); | |
/* handle->data is the `ipc_client' in thread_cb() */ | |
uv_close((uv_handle_t *) handle->data, NULL); | |
return; | |
} | |
if (status < 0) { | |
LOG_ERR("uv poll error: %s", UVE(status)); | |
return; | |
} | |
int fd; | |
int e = uv_fileno((uv_handle_t *) handle, &fd); | |
assert_eq(e, 0, "%d"); | |
static char buffer[READ_BUFSZ]; | |
ssize_t nread; | |
out_EINTR: | |
nread = read(fd, buffer, sizeof(buffer)); | |
if (nread < 0) { | |
if (errno == EINTR) goto out_EINTR; | |
LOG_ERR("read(2) fail fd: %d errno: %d", fd, errno); | |
} else if (nread == 0) { | |
LOG_ERR("Connection reset by peer"); | |
} else { | |
(void) fprintf(stderr, "%.*s", (int) nread, buffer); | |
} | |
} | |
static void ipc_client_connect_cb(uv_connect_t *req, int status) | |
{ | |
if (status < 0) { | |
LOG_ERR("uv_connect() fail uv error: %s", UVE(status)); | |
return; | |
} | |
int fd; | |
int e = uv_fileno((uv_handle_t *) req->handle, &fd); | |
assert_eq(e, 0, "%d"); | |
poll_ctx_t *p = req->data; | |
/* req->handle->loop is the `thread_loop' */ | |
e = uv_util_poll_init_start(p, req->handle->loop, fd, UV_READABLE | UV_DISCONNECT, ipc_read_cb); | |
assert_eq(e, 0, "%d"); | |
} | |
static void thread_cb(void *arg) | |
{ | |
assert_null(arg); | |
uv_loop_t *thread_loop = &(uv_loop_t){}; | |
int e = uv_loop_init(thread_loop); | |
assert_eqf(e, 0, "%d", "uv_loop_init() fail uv error: %s", UVE(e)); | |
uv_pipe_t *ipc_client = &(uv_pipe_t){}; | |
e = uv_pipe_init(thread_loop, ipc_client, 0); | |
assert_eq(e, 0, "%d"); | |
poll_ctx_t p; | |
p.handle.data = ipc_client; | |
uv_connect_t *conn = &(uv_connect_t){}; | |
conn->data = &p; | |
uv_pipe_connect(conn, ipc_client, IPC_PATH, ipc_client_connect_cb); | |
e = uv_run(thread_loop, UV_RUN_DEFAULT); | |
assert_eq(e, 0, "%d"); | |
e = uv_loop_close(thread_loop); | |
assert_eqf(e, 0, "%d", "uv_loop_close() fail error: %s", UVE(e)); | |
} | |
static void thread_init(void) | |
{ | |
int e; | |
LOG("Waiting IPC clients..."); | |
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) { | |
e = uv_thread_create(&threads[i], thread_cb, NULL); | |
assert_eqf(e, 0, "%d", " uv_thread_create() fail uv error: %s", UVE(e)); | |
} | |
} | |
static void thread_join(void) | |
{ | |
int e; | |
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) { | |
e = uv_thread_join(&threads[i]); | |
assert_eqf(e, 0, "%d", "uv_thread_join() fail uv error: %s", UVE(e)); | |
} | |
} | |
/* Round robin index */ | |
static uint32_t rr_index = 0; | |
static uv_pipe_t ipc_client[NR_THREAD]; | |
#define USE_STDIO 0 | |
#if USE_STDIO | |
static ssize_t fdgets(char *s, size_t size, int fd) | |
{ | |
char c; | |
size_t nread = 0; | |
ssize_t n = 0; | |
while (nread + 1 < size && (n = read(fd, &c, 1)) > 0) { | |
s[nread] = c; | |
nread += n; | |
if (c == '\n') break; | |
} | |
if (size) s[nread] = '\0'; | |
return n >= 0 ? (ssize_t) nread : n; | |
} | |
/* | |
* Called in main thread | |
*/ | |
static void stdin_read_cb(uv_poll_t *handle, int status, int events) | |
{ | |
if (status < 0) { | |
LOG_ERR("uv poll error: %s", UVE(status)); | |
return; | |
} | |
assert_eq(events, UV_READABLE, "%#x"); | |
int fd; | |
int e = uv_fileno((uv_handle_t *) handle, &fd); | |
assert_eq(e, 0, "%d"); | |
assert_eq(fd, STDIN_FILENO, "%d"); | |
static char buffer[4096]; | |
ssize_t n = fdgets(buffer, sizeof(buffer), fd); | |
if (n > 0) { | |
static uv_write_t write_req; | |
uv_buf_t buf = uv_buf_init(buffer, n); | |
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index]; | |
if (++rr_index == NR_THREAD) { | |
rr_index = 0; | |
} | |
e = uv_write(&write_req, stream, &buf, 1, NULL); | |
if (e != 0) { | |
if (e == UV_EPIPE) { | |
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD); | |
} else { | |
panicf("uv_write() fail uv error: %s", UVE(e)); | |
} | |
} | |
} else if (n == 0) { | |
LOG_DBG("Met EOF in stdin"); | |
e = uv_poll_stop(handle); | |
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e)); | |
uv_close((uv_handle_t *) handle, NULL); | |
for (size_t i = 0; i < ARRAY_SIZE(ipc_client); i++) { | |
uv_close((uv_handle_t *) &ipc_client[i], NULL); | |
} | |
uv_close((uv_handle_t *) ipc_server, NULL); | |
} else { | |
/* Simply ignore if there is any error */ | |
} | |
} | |
#else | |
static void ipc_echo(void) | |
{ | |
static uv_write_t write_req; | |
char *buffer = "+"; | |
size_t n = strlen(buffer); | |
uv_buf_t buf = uv_buf_init(buffer, n); | |
while (1) { | |
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index]; | |
if (++rr_index == NR_THREAD) { | |
rr_index = 0; | |
} | |
int e = uv_write(&write_req, stream, &buf, 1, NULL); | |
if (e != 0) { | |
if (e == UV_EPIPE) { | |
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD); | |
} else { | |
panicf("uv_write() fail uv error: %s", UVE(e)); | |
} | |
} | |
/* XXX: In case of IPC client send too soon(need confirmation)? */ | |
ms_sleep(1); | |
} | |
} | |
#endif | |
/** | |
* Called in main thread | |
*/ | |
static void ipc_listen_cb(uv_stream_t *server, int status) | |
{ | |
static size_t i = 0; | |
assert_lt(i, NR_THREAD, "%zu"); | |
assert_eq(server, ipc_server, "%p"); | |
if (status < 0) { | |
LOG_ERR("uv listen error: %s", UVE(status)); | |
return; | |
} | |
int e = uv_pipe_init(server->loop, &ipc_client[i], 0); | |
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e)); | |
e = uv_accept(server, (uv_stream_t *) &ipc_client[i]); | |
assert_eqf(e, 0, "%d", "uv_accept() fail uv error: %s", UVE(e)); | |
if (++i == NR_THREAD) { | |
#if USE_STDIO | |
LOG("All IPC clients connected, ready to poll stdin input..."); | |
static poll_ctx_t poll_ctx; | |
e = uv_util_poll_init_start(&poll_ctx, server->loop, STDIN_FILENO, UV_READABLE, stdin_read_cb); | |
assert_eqf(e, 0, "%d", "uv_util_poll_init_start() fail uv error: %s", UVE(e)); | |
#else | |
LOG("All IPC clients connected"); | |
ipc_echo(); | |
#endif | |
} | |
} | |
int test_main(void) | |
{ | |
LOG("%s", uv_version_string()); | |
uv_loop_t *main_loop = uv_default_loop(); | |
assert_nonnull(main_loop); | |
int e = uv_pipe_init(main_loop, ipc_server, 0); | |
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e)); | |
uv_fs_t *fs = &(uv_fs_t){}; | |
e = uv_fs_unlink(main_loop, fs, IPC_PATH, NULL); | |
if (e != 0 && e != UV_ENOENT) { | |
panicf("uv_fs_unlink() fail uv error: %s", UVE(e)); | |
} | |
e = uv_pipe_bind(ipc_server, IPC_PATH); | |
assert_eqf(e, 0, "%d", "uv_pipe_bind() fail uv error: %s", UVE(e)); | |
e = uv_listen((uv_stream_t *) ipc_server, SOMAXCONN, ipc_listen_cb); | |
assert_eqf(e, 0, "%d", "uv_listen() fail uv error: %s", UVE(e)); | |
thread_init(); | |
/* | |
* [sic] | |
* Returns non-zero if uv_stop() was called and there are still active handles or requests. | |
* Returns zero in all other cases. | |
*/ | |
e = uv_run(main_loop, UV_RUN_DEFAULT); | |
assert_eqf(e, 0, "%d", "uv_run() fail uv error: %s", UVE(e)); | |
LOG("Main thread main_loop terminated, join threads..."); | |
thread_join(); | |
e = uv_loop_close(main_loop); | |
assert_eqf(e, 0, "%d", "uv_loop_close() fail uv error: %s", UVE(e)); | |
#if UV_VER_REQ(>=, 1, 38, 0) | |
/* see: libuv/test/task.h#MAKE_VALGRIND_HAPPY() */ | |
uv_library_shutdown(); | |
#endif | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated
uv_pipe_ipc_echo.c
, which can work properly, tested under Ubuntu 18.04, 20.04 LTS