Created
July 9, 2011 07:26
-
-
Save igorzi/1073416 to your computer and use it in GitHub Desktop.
pipes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 4b9eaed4760c1f25d41bf8431b53bbf7f085b787 Mon Sep 17 00:00:00 2001 | |
From: Igor Zinkovsky <igorzi@microsoft.com> | |
Date: Fri, 1 Jul 2011 17:54:17 -0700 | |
Subject: [PATCH] Named pipes implementation for Windows | |
--- | |
include/uv-unix.h | 5 + | |
include/uv-win.h | 64 ++++- | |
include/uv.h | 26 ++- | |
src/uv-unix.c | 19 ++ | |
src/uv-win.c | 748 ++++++++++++++++++++++++++++++++++++++++++++++-- | |
test/benchmark-list.h | 28 ++- | |
test/benchmark-pump.c | 151 ++++++++--- | |
test/benchmark-sizes.c | 1 + | |
test/echo-server.c | 119 ++++++-- | |
test/task.h | 11 + | |
test/test-list.h | 28 ++- | |
test/test-ping-pong.c | 80 ++++-- | |
12 files changed, 1135 insertions(+), 145 deletions(-) | |
diff --git a/include/uv-unix.h b/include/uv-unix.h | |
index f68a7fd..38ecca9 100644 | |
--- a/include/uv-unix.h | |
+++ b/include/uv-unix.h | |
@@ -73,6 +73,11 @@ typedef struct { | |
ev_io write_watcher; \ | |
ngx_queue_t write_queue; \ | |
ngx_queue_t write_completed_queue; | |
+ | |
+ | |
+/* UV_NAMED_PIPE */ | |
+#define UV_PIPE_PRIVATE_TYPEDEF \ | |
+#define UV_PIPE_PRIVATE_FIELDS \ | |
/* UV_PREPARE */ \ | |
diff --git a/include/uv-win.h b/include/uv-win.h | |
index e6254fe..ee29e8c 100644 | |
--- a/include/uv-win.h | |
+++ b/include/uv-win.h | |
@@ -31,6 +31,8 @@ | |
#include "tree.h" | |
+#define MAX_PIPENAME_LEN 256 | |
+ | |
/** | |
* It should be possible to cast uv_buf_t[] to WSABUF[] | |
* see http://msdn.microsoft.com/en-us/library/ms741542(v=vs.85).aspx | |
@@ -40,6 +42,17 @@ typedef struct uv_buf_t { | |
char* base; | |
} uv_buf_t; | |
+/* | |
+ * Private uv_pipe_instance state. | |
+ */ | |
+typedef enum { | |
+ UV_PIPEINSTANCE_DISCONNECTED = 0, | |
+ UV_PIPEINSTANCE_PENDING, | |
+ UV_PIPEINSTANCE_WAITING, | |
+ UV_PIPEINSTANCE_ACCEPTED, | |
+ UV_PIPEINSTANCE_ACTIVE | |
+} uv_pipeinstance_state; | |
+ | |
#define UV_REQ_PRIVATE_FIELDS \ | |
union { \ | |
/* Used by I/O operations */ \ | |
@@ -51,31 +64,56 @@ typedef struct uv_buf_t { | |
int flags; \ | |
uv_err_t error; \ | |
struct uv_req_s* next_req; | |
+ | |
+#define uv_stream_connection_fields \ | |
+ unsigned int write_reqs_pending; \ | |
+ uv_req_t* shutdown_req; | |
+ | |
+#define uv_stream_server_fields \ | |
+ uv_connection_cb connection_cb; | |
#define UV_STREAM_PRIVATE_FIELDS \ | |
+ unsigned int reqs_pending; \ | |
uv_alloc_cb alloc_cb; \ | |
uv_read_cb read_cb; \ | |
struct uv_req_s read_req; \ | |
- | |
-#define uv_tcp_connection_fields \ | |
- unsigned int write_reqs_pending; \ | |
- uv_req_t* shutdown_req; | |
- | |
-#define uv_tcp_server_fields \ | |
- uv_connection_cb connection_cb; \ | |
- SOCKET accept_socket; \ | |
- struct uv_req_s accept_req; \ | |
- char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; | |
+ union { \ | |
+ struct { uv_stream_connection_fields }; \ | |
+ struct { uv_stream_server_fields }; \ | |
+ }; | |
#define UV_TCP_PRIVATE_FIELDS \ | |
- unsigned int reqs_pending; \ | |
union { \ | |
SOCKET socket; \ | |
HANDLE handle; \ | |
}; \ | |
+ SOCKET accept_socket; \ | |
+ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ | |
+ struct uv_req_s accept_req; | |
+ | |
+#define uv_pipe_server_fields \ | |
+ char* name; \ | |
+ int connectionCount; \ | |
+ uv_pipe_instance_t* connections; \ | |
+ uv_pipe_instance_t* acceptConnection; \ | |
+ uv_pipe_instance_t connectionsBuffer[4]; | |
+ | |
+#define uv_pipe_connection_fields \ | |
+ uv_pipe_t* server; \ | |
+ uv_pipe_instance_t* connection; \ | |
+ uv_pipe_instance_t clientConnection; | |
+ | |
+#define UV_PIPE_PRIVATE_TYPEDEF \ | |
+ typedef struct uv_pipe_instance_s { \ | |
+ HANDLE handle; \ | |
+ uv_pipeinstance_state state; \ | |
+ uv_req_t accept_req; \ | |
+ } uv_pipe_instance_t; | |
+ | |
+#define UV_PIPE_PRIVATE_FIELDS \ | |
union { \ | |
- struct { uv_tcp_connection_fields }; \ | |
- struct { uv_tcp_server_fields }; \ | |
+ struct { uv_pipe_server_fields }; \ | |
+ struct { uv_pipe_connection_fields }; \ | |
}; | |
#define UV_TIMER_PRIVATE_FIELDS \ | |
diff --git a/include/uv.h b/include/uv.h | |
index 6aecb28..5e0c346 100644 | |
--- a/include/uv.h | |
+++ b/include/uv.h | |
@@ -43,6 +43,7 @@ typedef struct uv_err_s uv_err_t; | |
typedef struct uv_handle_s uv_handle_t; | |
typedef struct uv_stream_s uv_stream_t; | |
typedef struct uv_tcp_s uv_tcp_t; | |
+typedef struct uv_pipe_s uv_pipe_t; | |
typedef struct uv_timer_s uv_timer_t; | |
typedef struct uv_prepare_s uv_prepare_t; | |
typedef struct uv_check_s uv_check_t; | |
@@ -124,7 +125,8 @@ typedef enum { | |
UV_EAIFAMNOSUPPORT, | |
UV_EAINONAME, | |
UV_EAISERVICE, | |
- UV_EAISOCKTYPE | |
+ UV_EAISOCKTYPE, | |
+ UV_ESHUTDOWN | |
} uv_err_code; | |
typedef enum { | |
@@ -288,6 +290,26 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); | |
/* | |
+ * A subclass of uv_stream_t representing a pipe stream or pipe server. | |
+ */ | |
+UV_PIPE_PRIVATE_TYPEDEF | |
+ | |
+struct uv_pipe_s { | |
+ UV_HANDLE_FIELDS | |
+ UV_STREAM_FIELDS | |
+ UV_PIPE_PRIVATE_FIELDS | |
+}; | |
+ | |
+int uv_pipe_init(uv_pipe_t* handle); | |
+ | |
+int uv_pipe_create(uv_pipe_t* handle, char* name); | |
+ | |
+int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb); | |
+ | |
+int uv_pipe_connect(uv_req_t* req, char* name); | |
+ | |
+ | |
+/* | |
* Subclass of uv_handle_t. libev wrapper. Every active prepare handle gets | |
* its callback called exactly once per loop iteration, just before the | |
* system blocks to wait for completed i/o. | |
@@ -478,7 +500,9 @@ union uv_any_handle { | |
typedef struct { | |
uint64_t req_init; | |
uint64_t handle_init; | |
+ uint64_t stream_init; | |
uint64_t tcp_init; | |
+ uint64_t pipe_init; | |
uint64_t prepare_init; | |
uint64_t check_init; | |
uint64_t idle_init; | |
diff --git a/src/uv-unix.c b/src/uv-unix.c | |
index 786657b..6bffaf2 100644 | |
--- a/src/uv-unix.c | |
+++ b/src/uv-unix.c | |
@@ -1594,3 +1594,22 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle, | |
return 0; | |
} | |
+ | |
+int uv_pipe_init(uv_pipe_t* handle) { | |
+ assert(0 && "implement me"); | |
+} | |
+ | |
+ | |
+int uv_pipe_create(uv_pipe_t* handle, char* name) { | |
+ assert(0 && "implement me"); | |
+} | |
+ | |
+ | |
+int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { | |
+ assert(0 && "implement me"); | |
+} | |
+ | |
+ | |
+int uv_pipe_connect(uv_req_t* req, char* name) { | |
+ assert(0 && "implement me"); | |
+} | |
diff --git a/src/uv-win.c b/src/uv-win.c | |
index b92eb36..a27bc92 100644 | |
--- a/src/uv-win.c | |
+++ b/src/uv-win.c | |
@@ -164,6 +164,7 @@ static LPFN_TRANSMITFILE pTransmitFile6; | |
#define UV_HANDLE_ENDGAME_QUEUED 0x0400 | |
#define UV_HANDLE_BIND_ERROR 0x1000 | |
#define UV_HANDLE_IPV6 0x2000 | |
+#define UV_HANDLE_PIPESERVER 0x4000 | |
/* | |
* Private uv_req flags. | |
@@ -242,6 +243,8 @@ void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req); | |
void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req); | |
void uv_ares_poll(uv_timer_t* handle, int status); | |
+static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); | |
+ | |
/* memory used per ares_channel */ | |
struct uv_ares_channel_s { | |
ares_channel channel; | |
@@ -373,6 +376,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) { | |
case ERROR_INVALID_FLAGS: return UV_EBADF; | |
case ERROR_INVALID_PARAMETER: return UV_EINVAL; | |
case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET; | |
+ case ERROR_BROKEN_PIPE: return UV_EOF; | |
default: return UV_UNKNOWN; | |
} | |
} | |
@@ -527,6 +531,11 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) { | |
} | |
+static uv_pipe_instance_t* uv_req_to_pipeinstance(uv_req_t* req) { | |
+ return CONTAINING_RECORD(req, uv_pipe_instance_t, accept_req); | |
+} | |
+ | |
+ | |
static void uv_insert_pending_req(uv_req_t* req) { | |
req->next_req = NULL; | |
if (uv_pending_reqs_tail_) { | |
@@ -594,24 +603,20 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) { | |
} | |
-static void uv_tcp_init_connection(uv_tcp_t* handle) { | |
+static void uv_init_connection(uv_stream_t* handle) { | |
handle->flags |= UV_HANDLE_CONNECTION; | |
handle->write_reqs_pending = 0; | |
uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL); | |
} | |
-int uv_tcp_init(uv_tcp_t* handle) { | |
- handle->socket = INVALID_SOCKET; | |
+int uv_stream_init(uv_stream_t* handle) { | |
handle->write_queue_size = 0; | |
- handle->type = UV_TCP; | |
handle->flags = 0; | |
- handle->reqs_pending = 0; | |
handle->error = uv_ok_; | |
- handle->accept_socket = INVALID_SOCKET; | |
uv_counters()->handle_init++; | |
- uv_counters()->tcp_init++; | |
+ uv_counters()->stream_init++; | |
uv_refs_++; | |
@@ -619,6 +624,20 @@ int uv_tcp_init(uv_tcp_t* handle) { | |
} | |
+int uv_tcp_init(uv_tcp_t* handle) { | |
+ uv_stream_init((uv_stream_t*)handle); | |
+ | |
+ handle->socket = INVALID_SOCKET; | |
+ handle->type = UV_TCP; | |
+ handle->reqs_pending = 0; | |
+ handle->accept_socket = INVALID_SOCKET; | |
+ | |
+ uv_counters()->tcp_init++; | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
static void uv_tcp_endgame(uv_tcp_t* handle) { | |
uv_err_t err; | |
int status; | |
@@ -658,6 +677,39 @@ static void uv_tcp_endgame(uv_tcp_t* handle) { | |
} | |
+static void uv_pipe_endgame(uv_pipe_t* handle) { | |
+ uv_err_t err; | |
+ int status; | |
+ | |
+ if (handle->flags & UV_HANDLE_SHUTTING && | |
+ !(handle->flags & UV_HANDLE_SHUT) && | |
+ handle->write_reqs_pending == 0) { | |
+ close_pipe(handle, &status, &err); | |
+ | |
+ if (handle->shutdown_req->cb) { | |
+ handle->shutdown_req->flags &= ~UV_REQ_PENDING; | |
+ if (status == -1) { | |
+ uv_last_error_ = err; | |
+ } | |
+ ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status); | |
+ } | |
+ handle->reqs_pending--; | |
+ } | |
+ | |
+ if (handle->flags & UV_HANDLE_CLOSING && | |
+ handle->reqs_pending == 0) { | |
+ assert(!(handle->flags & UV_HANDLE_CLOSED)); | |
+ handle->flags |= UV_HANDLE_CLOSED; | |
+ | |
+ if (handle->close_cb) { | |
+ handle->close_cb((uv_handle_t*)handle); | |
+ } | |
+ | |
+ uv_refs_--; | |
+ } | |
+} | |
+ | |
+ | |
static void uv_timer_endgame(uv_timer_t* handle) { | |
if (handle->flags & UV_HANDLE_CLOSING) { | |
assert(!(handle->flags & UV_HANDLE_CLOSED)); | |
@@ -714,6 +766,10 @@ static void uv_process_endgames() { | |
case UV_TCP: | |
uv_tcp_endgame((uv_tcp_t*)handle); | |
break; | |
+ | |
+ case UV_NAMED_PIPE: | |
+ uv_pipe_endgame((uv_pipe_t*)handle); | |
+ break; | |
case UV_TIMER: | |
uv_timer_endgame((uv_timer_t*)handle); | |
@@ -749,6 +805,7 @@ static void uv_want_endgame(uv_handle_t* handle) { | |
static int uv_close_error(uv_handle_t* handle, uv_err_t e) { | |
uv_tcp_t* tcp; | |
+ uv_pipe_t* pipe; | |
if (handle->flags & UV_HANDLE_CLOSING) { | |
return 0; | |
@@ -767,6 +824,15 @@ static int uv_close_error(uv_handle_t* handle, uv_err_t e) { | |
uv_want_endgame(handle); | |
} | |
return 0; | |
+ | |
+ case UV_NAMED_PIPE: | |
+ pipe = (uv_pipe_t*)handle; | |
+ pipe->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING); | |
+ close_pipe(pipe, NULL, NULL); | |
+ if (pipe->reqs_pending == 0) { | |
+ uv_want_endgame(handle); | |
+ } | |
+ return 0; | |
case UV_TIMER: | |
uv_timer_stop((uv_timer_t*)handle); | |
@@ -871,7 +937,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) { | |
} | |
-static void uv_queue_accept(uv_tcp_t* handle) { | |
+static void uv_tcp_queue_accept(uv_tcp_t* handle) { | |
uv_req_t* req; | |
BOOL success; | |
DWORD bytes; | |
@@ -902,6 +968,7 @@ static void uv_queue_accept(uv_tcp_t* handle) { | |
if (accept_socket == INVALID_SOCKET) { | |
req->error = uv_new_sys_error(WSAGetLastError()); | |
uv_insert_pending_req(req); | |
+ handle->reqs_pending++; | |
return; | |
} | |
@@ -921,6 +988,7 @@ static void uv_queue_accept(uv_tcp_t* handle) { | |
/* Make this req pending reporting an error. */ | |
req->error = uv_new_sys_error(WSAGetLastError()); | |
uv_insert_pending_req(req); | |
+ handle->reqs_pending++; | |
/* Destroy the preallocated client socket. */ | |
closesocket(accept_socket); | |
return; | |
@@ -933,7 +1001,48 @@ static void uv_queue_accept(uv_tcp_t* handle) { | |
} | |
-static void uv_queue_read(uv_tcp_t* handle) { | |
+static void uv_pipe_queue_accept(uv_pipe_t* handle) { | |
+ uv_req_t* req; | |
+ uv_pipe_instance_t* instance; | |
+ int i; | |
+ | |
+ assert(handle->flags & UV_HANDLE_LISTENING); | |
+ | |
+ /* This loop goes through every pipe instance and calls ConnectNamedPipe for every pending instance. | |
+ * TODO: Make this faster (we could maintain a linked list of pending instances). | |
+ */ | |
+ for (i = 0; i < handle->connectionCount; i++) { | |
+ instance = &handle->connections[i]; | |
+ | |
+ if (instance->state == UV_PIPEINSTANCE_PENDING) { | |
+ /* Prepare the uv_req structure. */ | |
+ req = &instance->accept_req; | |
+ uv_req_init(req, (uv_handle_t*)handle, NULL); | |
+ assert(!(req->flags & UV_REQ_PENDING)); | |
+ req->type = UV_ACCEPT; | |
+ req->flags |= UV_REQ_PENDING; | |
+ | |
+ /* Prepare the overlapped structure. */ | |
+ memset(&(req->overlapped), 0, sizeof(req->overlapped)); | |
+ | |
+ if (!ConnectNamedPipe(instance->handle, &req->overlapped) && | |
+ GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { | |
+ /* Make this req pending reporting an error. */ | |
+ req->error = uv_new_sys_error(GetLastError()); | |
+ uv_insert_pending_req(req); | |
+ handle->reqs_pending++; | |
+ continue; | |
+ } | |
+ | |
+ instance->state = UV_PIPEINSTANCE_WAITING; | |
+ handle->reqs_pending++; | |
+ req->flags |= UV_REQ_PENDING; | |
+ } | |
+ } | |
+} | |
+ | |
+ | |
+static void uv_tcp_queue_read(uv_tcp_t* handle) { | |
uv_req_t* req; | |
uv_buf_t buf; | |
int result; | |
@@ -961,6 +1070,40 @@ static void uv_queue_read(uv_tcp_t* handle) { | |
/* Make this req pending reporting an error. */ | |
req->error = uv_new_sys_error(WSAGetLastError()); | |
uv_insert_pending_req(req); | |
+ handle->reqs_pending++; | |
+ return; | |
+ } | |
+ | |
+ req->flags |= UV_REQ_PENDING; | |
+ handle->reqs_pending++; | |
+} | |
+ | |
+ | |
+static void uv_pipe_queue_read(uv_pipe_t* handle) { | |
+ uv_req_t* req; | |
+ int result; | |
+ | |
+ assert(handle->flags & UV_HANDLE_READING); | |
+ assert(handle->connection); | |
+ assert(handle->connection->handle != INVALID_HANDLE_VALUE); | |
+ | |
+ req = &handle->read_req; | |
+ assert(!(req->flags & UV_REQ_PENDING)); | |
+ memset(&req->overlapped, 0, sizeof(req->overlapped)); | |
+ req->type = UV_READ; | |
+ | |
+ /* Do 0-read */ | |
+ result = ReadFile(handle->connection->handle, | |
+ &uv_zero_, | |
+ 0, | |
+ NULL, | |
+ &req->overlapped); | |
+ | |
+ if (!result && GetLastError() != ERROR_IO_PENDING) { | |
+ /* Make this req pending reporting an error. */ | |
+ req->error = uv_new_sys_error(WSAGetLastError()); | |
+ uv_insert_pending_req(req); | |
+ handle->reqs_pending++; | |
return; | |
} | |
@@ -993,40 +1136,76 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { | |
handle->connection_cb = cb; | |
uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL); | |
- uv_queue_accept(handle); | |
+ uv_tcp_queue_accept(handle); | |
return 0; | |
} | |
-int uv_accept(uv_handle_t* server, uv_stream_t* client) { | |
+static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { | |
int rv = 0; | |
- uv_tcp_t* tcpServer = (uv_tcp_t*)server; | |
- uv_tcp_t* tcpClient = (uv_tcp_t*)client; | |
- if (tcpServer->accept_socket == INVALID_SOCKET) { | |
+ if (server->accept_socket == INVALID_SOCKET) { | |
uv_set_sys_error(WSAENOTCONN); | |
return -1; | |
} | |
- if (uv_tcp_set_socket(tcpClient, tcpServer->accept_socket) == -1) { | |
- closesocket(tcpServer->accept_socket); | |
+ if (uv_tcp_set_socket(client, server->accept_socket) == -1) { | |
+ closesocket(server->accept_socket); | |
rv = -1; | |
} else { | |
- uv_tcp_init_connection(tcpClient); | |
+ uv_init_connection((uv_stream_t*)client); | |
} | |
- tcpServer->accept_socket = INVALID_SOCKET; | |
+ server->accept_socket = INVALID_SOCKET; | |
- if (!(tcpServer->flags & UV_HANDLE_CLOSING)) { | |
- uv_queue_accept(tcpServer); | |
+ if (!(server->flags & UV_HANDLE_CLOSING)) { | |
+ uv_tcp_queue_accept(server); | |
} | |
return rv; | |
} | |
-int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { | |
+static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { | |
+ assert(server->acceptConnection); | |
+ | |
+ /* Make the connection instance active */ | |
+ server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE; | |
+ | |
+ /* Move the connection instance from server to client */ | |
+ client->connection = server->acceptConnection; | |
+ server->acceptConnection = NULL; | |
+ | |
+ /* Remember the server */ | |
+ client->server = server; | |
+ | |
+ uv_init_connection((uv_stream_t*)client); | |
+ client->flags |= UV_HANDLE_PIPESERVER; | |
+ uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL); | |
+ | |
+ if (!(server->flags & UV_HANDLE_CLOSING)) { | |
+ uv_pipe_queue_accept(server); | |
+ } | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+int uv_accept(uv_handle_t* server, uv_stream_t* client) { | |
+ assert(client->type == server->type); | |
+ | |
+ if (server->type == UV_TCP) { | |
+ return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); | |
+ } else if (server->type == UV_NAMED_PIPE) { | |
+ return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); | |
+ } | |
+ | |
+ return -1; | |
+} | |
+ | |
+ | |
+static int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { | |
if (!(handle->flags & UV_HANDLE_CONNECTION)) { | |
uv_set_sys_error(WSAEINVAL); | |
return -1; | |
@@ -1049,12 +1228,52 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) | |
/* If reading was stopped and then started again, there could stell be a */ | |
/* read request pending. */ | |
if (!(handle->read_req.flags & UV_REQ_PENDING)) | |
- uv_queue_read((uv_tcp_t*)handle); | |
+ uv_tcp_queue_read(handle); | |
return 0; | |
} | |
+static int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { | |
+ if (!(handle->flags & UV_HANDLE_CONNECTION)) { | |
+ uv_set_sys_error(UV_EINVAL); | |
+ return -1; | |
+ } | |
+ | |
+ if (handle->flags & UV_HANDLE_READING) { | |
+ uv_set_sys_error(UV_EALREADY); | |
+ return -1; | |
+ } | |
+ | |
+ if (handle->flags & UV_HANDLE_EOF) { | |
+ uv_set_sys_error(UV_EOF); | |
+ return -1; | |
+ } | |
+ | |
+ handle->flags |= UV_HANDLE_READING; | |
+ handle->read_cb = read_cb; | |
+ handle->alloc_cb = alloc_cb; | |
+ | |
+ /* If reading was stopped and then started again, there could stell be a */ | |
+ /* read request pending. */ | |
+ if (!(handle->read_req.flags & UV_REQ_PENDING)) | |
+ uv_pipe_queue_read(handle); | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { | |
+ if (handle->type == UV_TCP) { | |
+ return uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb); | |
+ } else if (handle->type == UV_NAMED_PIPE) { | |
+ return uv_pipe_read_start((uv_pipe_t*)handle, alloc_cb, read_cb); | |
+ } | |
+ | |
+ return -1; | |
+} | |
+ | |
+ | |
int uv_read_stop(uv_stream_t* handle) { | |
handle->flags &= ~UV_HANDLE_READING; | |
@@ -1169,7 +1388,7 @@ static size_t uv_count_bufs(uv_buf_t bufs[], int count) { | |
} | |
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { | |
+int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { | |
int result; | |
DWORD bytes, err; | |
uv_tcp_t* handle = (uv_tcp_t*) req->handle; | |
@@ -1222,6 +1441,72 @@ int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { | |
} | |
+int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { | |
+ int result; | |
+ uv_pipe_t* handle = (uv_pipe_t*) req->handle; | |
+ | |
+ assert(!(req->flags & UV_REQ_PENDING)); | |
+ | |
+ if (bufcnt != 1) { | |
+ uv_set_sys_error(UV_ENOTSUP); | |
+ return -1; | |
+ } | |
+ | |
+ assert(handle->connection); | |
+ assert(handle->connection->handle != INVALID_HANDLE_VALUE); | |
+ | |
+ if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { | |
+ uv_set_sys_error(UV_EINVAL); | |
+ return -1; | |
+ } | |
+ | |
+ if (req->handle->flags & UV_HANDLE_SHUTTING) { | |
+ uv_set_sys_error(UV_EOF); | |
+ return -1; | |
+ } | |
+ | |
+ memset(&req->overlapped, 0, sizeof(req->overlapped)); | |
+ req->type = UV_WRITE; | |
+ | |
+ result = WriteFile(handle->connection->handle, | |
+ bufs[0].base, | |
+ bufs[0].len, | |
+ NULL, | |
+ &req->overlapped); | |
+ | |
+ if (!result && GetLastError() != WSA_IO_PENDING) { | |
+ uv_set_sys_error(GetLastError()); | |
+ return -1; | |
+ } | |
+ | |
+ if (result) { | |
+ /* Request completed immediately. */ | |
+ req->queued_bytes = 0; | |
+ } else { | |
+ /* Request queued by the kernel. */ | |
+ req->queued_bytes = uv_count_bufs(bufs, bufcnt); | |
+ handle->write_queue_size += req->queued_bytes; | |
+ } | |
+ | |
+ req->flags |= UV_REQ_PENDING; | |
+ handle->reqs_pending++; | |
+ handle->write_reqs_pending++; | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { | |
+ if (req->handle->type == UV_TCP) { | |
+ return uv_tcp_write(req, bufs, bufcnt); | |
+ } else if (req->handle->type == UV_NAMED_PIPE) { | |
+ return uv_pipe_write(req, bufs, bufcnt); | |
+ } | |
+ | |
+ return -1; | |
+} | |
+ | |
+ | |
int uv_shutdown(uv_req_t* req) { | |
uv_tcp_t* handle = (uv_tcp_t*) req->handle; | |
int status = 0; | |
@@ -1240,7 +1525,7 @@ int uv_shutdown(uv_req_t* req) { | |
req->flags |= UV_REQ_PENDING; | |
handle->flags |= UV_HANDLE_SHUTTING; | |
- handle->shutdown_req = req; | |
+ handle->shutdown_req = req; | |
handle->reqs_pending++; | |
uv_want_endgame((uv_handle_t*)handle); | |
@@ -1332,7 +1617,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { | |
} | |
/* Post another 0-read if still reading and not closing. */ | |
if (handle->flags & UV_HANDLE_READING) { | |
- uv_queue_read(handle); | |
+ uv_tcp_queue_read(handle); | |
} | |
break; | |
@@ -1369,7 +1654,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { | |
/* uv_queue_accept will detect it. */ | |
closesocket(handle->accept_socket); | |
if (handle->flags & UV_HANDLE_LISTENING) { | |
- uv_queue_accept(handle); | |
+ uv_tcp_queue_accept(handle); | |
} | |
} | |
break; | |
@@ -1382,7 +1667,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { | |
SO_UPDATE_CONNECT_CONTEXT, | |
NULL, | |
0) == 0) { | |
- uv_tcp_init_connection(handle); | |
+ uv_init_connection((uv_stream_t*)handle); | |
((uv_connect_cb)req->cb)(req, 0); | |
} else { | |
uv_set_sys_error(WSAGetLastError()); | |
@@ -1411,6 +1696,161 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { | |
} | |
+static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) { | |
+ DWORD bytes, err, mode; | |
+ uv_buf_t buf; | |
+ uv_pipe_instance_t* acceptingConn; | |
+ | |
+ assert(handle->type == UV_NAMED_PIPE); | |
+ | |
+ /* Mark the request non-pending */ | |
+ req->flags &= ~UV_REQ_PENDING; | |
+ | |
+ switch (req->type) { | |
+ case UV_WRITE: | |
+ handle->write_queue_size -= req->queued_bytes; | |
+ if (req->cb) { | |
+ uv_last_error_ = req->error; | |
+ ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); | |
+ } | |
+ handle->write_reqs_pending--; | |
+ if (handle->write_reqs_pending == 0 && | |
+ handle->flags & UV_HANDLE_SHUTTING) { | |
+ uv_want_endgame((uv_handle_t*)handle); | |
+ } | |
+ break; | |
+ | |
+ case UV_READ: | |
+ if (req->error.code != UV_OK) { | |
+ /* An error occurred doing the 0-read. */ | |
+ if (!(handle->flags & UV_HANDLE_READING)) { | |
+ break; | |
+ } | |
+ | |
+ /* Stop reading and report error. */ | |
+ handle->flags &= ~UV_HANDLE_READING; | |
+ uv_last_error_ = req->error; | |
+ buf.base = 0; | |
+ buf.len = 0; | |
+ handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ break; | |
+ } | |
+ | |
+ /* Temporarily switch to non-blocking mode. | |
+ * This is so that ReadFile doesn't block if the read buffer is empty. | |
+ */ | |
+ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT; | |
+ if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { | |
+ /* We can't continue processing this read. */ | |
+ err = GetLastError(); | |
+ uv_set_sys_error(err); | |
+ handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ break; | |
+ } | |
+ | |
+ /* Do non-blocking reads until the buffer is empty */ | |
+ while (handle->flags & UV_HANDLE_READING) { | |
+ buf = handle->alloc_cb((uv_stream_t*)handle, 65536); | |
+ assert(buf.len > 0); | |
+ | |
+ if (ReadFile(handle->connection->handle, | |
+ buf.base, | |
+ buf.len, | |
+ &bytes, | |
+ NULL)) { | |
+ if (bytes > 0) { | |
+ /* Successful read */ | |
+ handle->read_cb((uv_stream_t*)handle, bytes, buf); | |
+ /* Read again only if bytes == buf.len */ | |
+ if (bytes < buf.len) { | |
+ break; | |
+ } | |
+ } else { | |
+ /* Connection closed */ | |
+ handle->flags &= ~UV_HANDLE_READING; | |
+ handle->flags |= UV_HANDLE_EOF; | |
+ uv_last_error_.code = UV_EOF; | |
+ uv_last_error_.sys_errno_ = ERROR_SUCCESS; | |
+ handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ break; | |
+ } | |
+ } else { | |
+ err = GetLastError(); | |
+ if (err == ERROR_NO_DATA) { | |
+ /* Read buffer was completely empty, report a 0-byte read. */ | |
+ uv_set_sys_error(UV_EAGAIN); | |
+ handle->read_cb((uv_stream_t*)handle, 0, buf); | |
+ } else { | |
+ /* Ouch! serious error. */ | |
+ uv_set_sys_error(err); | |
+ handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ } | |
+ break; | |
+ } | |
+ } | |
+ | |
+ if (handle->flags & UV_HANDLE_READING) { | |
+ /* Switch back to blocking mode so that we can use IOCP for 0-reads */ | |
+ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; | |
+ if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { | |
+ /* Report and continue. */ | |
+ err = GetLastError(); | |
+ uv_set_sys_error(err); | |
+ handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ break; | |
+ } | |
+ | |
+ /* Post another 0-read if still reading and not closing. */ | |
+ uv_pipe_queue_read(handle); | |
+ } | |
+ | |
+ break; | |
+ | |
+ case UV_ACCEPT: | |
+ if (req->error.code == UV_OK) { | |
+ /* Put the connection instance into accept state */ | |
+ handle->acceptConnection = uv_req_to_pipeinstance(req); | |
+ handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED; | |
+ | |
+ if (handle->connection_cb) { | |
+ handle->connection_cb((uv_handle_t*)handle, 0); | |
+ } | |
+ } else { | |
+ /* Ignore errors and continue listening */ | |
+ if (handle->flags & UV_HANDLE_LISTENING) { | |
+ uv_pipe_queue_accept(handle); | |
+ } | |
+ } | |
+ break; | |
+ | |
+ case UV_CONNECT: | |
+ if (req->cb) { | |
+ if (req->error.code == UV_OK) { | |
+ uv_init_connection((uv_stream_t*)handle); | |
+ ((uv_connect_cb)req->cb)(req, 0); | |
+ } else { | |
+ uv_last_error_ = req->error; | |
+ ((uv_connect_cb)req->cb)(req, -1); | |
+ } | |
+ } | |
+ break; | |
+ | |
+ default: | |
+ assert(0); | |
+ } | |
+ | |
+ /* The number of pending requests is now down by one */ | |
+ handle->reqs_pending--; | |
+ | |
+ /* Queue the handle's close callback if it is closing and there are no */ | |
+ /* more pending requests. */ | |
+ if (handle->flags & UV_HANDLE_CLOSING && | |
+ handle->reqs_pending == 0) { | |
+ uv_want_endgame((uv_handle_t*)handle); | |
+ } | |
+} | |
+ | |
+ | |
static int uv_timer_compare(uv_timer_t* a, uv_timer_t* b) { | |
if (a->due < b->due) | |
return -1; | |
@@ -1708,6 +2148,10 @@ static void uv_process_reqs() { | |
case UV_TCP: | |
uv_tcp_return_req((uv_tcp_t*)handle, req); | |
break; | |
+ | |
+ case UV_NAMED_PIPE: | |
+ uv_pipe_return_req((uv_pipe_t*)handle, req); | |
+ break; | |
case UV_ASYNC: | |
uv_async_return_req((uv_async_t*)handle, req); | |
@@ -2454,3 +2898,253 @@ error: | |
return -1; | |
} | |
+ | |
+int uv_pipe_init(uv_pipe_t* handle) { | |
+ uv_stream_init((uv_stream_t*)handle); | |
+ | |
+ handle->type = UV_NAMED_PIPE; | |
+ handle->reqs_pending = 0; | |
+ | |
+ uv_counters()->pipe_init++; | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+/* Creates a pipe server. */ | |
+/* TODO: make this work with UTF8 name */ | |
+int uv_pipe_create(uv_pipe_t* handle, char* name) { | |
+ if (!name) { | |
+ return -1; | |
+ } | |
+ | |
+ handle->connections = NULL; | |
+ handle->acceptConnection = NULL; | |
+ handle->connectionCount = 0; | |
+ | |
+ /* Make our own copy of the pipe name */ | |
+ handle->name = (char*)malloc(MAX_PIPENAME_LEN); | |
+ if (!handle->name) { | |
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); | |
+ } | |
+ strcpy(handle->name, name); | |
+ handle->name[255] = '\0'; | |
+ | |
+ handle->flags |= UV_HANDLE_PIPESERVER; | |
+ return 0; | |
+} | |
+ | |
+ | |
+/* Starts listening for connections for the given pipe. */ | |
+int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { | |
+ int i, maxInstances, errno; | |
+ HANDLE pipeHandle; | |
+ uv_pipe_instance_t* pipeInstance; | |
+ | |
+ if (handle->flags & UV_HANDLE_LISTENING || | |
+ handle->flags & UV_HANDLE_READING) { | |
+ uv_set_sys_error(UV_EALREADY); | |
+ return -1; | |
+ } | |
+ | |
+ if (!(handle->flags & UV_HANDLE_PIPESERVER)) { | |
+ uv_set_sys_error(UV_ENOTSUP); | |
+ return -1; | |
+ } | |
+ | |
+ if (instanceCount <= sizeof(handle->connectionsBuffer)) { | |
+ /* Use preallocated connections buffer */ | |
+ handle->connections = handle->connectionsBuffer; | |
+ } else { | |
+ handle->connections = (uv_pipe_instance_t*)malloc(instanceCount * sizeof(uv_pipe_instance_t)); | |
+ if (!handle->connections) { | |
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); | |
+ } | |
+ } | |
+ | |
+ maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount; | |
+ | |
+ for (i = 0; i < instanceCount; i++) { | |
+ pipeHandle = CreateNamedPipe(handle->name, | |
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, | |
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, | |
+ maxInstances, | |
+ 65536, | |
+ 65536, | |
+ 0, | |
+ NULL); | |
+ | |
+ if (pipeHandle == INVALID_HANDLE_VALUE) { | |
+ errno = GetLastError(); | |
+ goto error; | |
+ } | |
+ | |
+ if (CreateIoCompletionPort(pipeHandle, | |
+ uv_iocp_, | |
+ (ULONG_PTR)handle, | |
+ 0) == NULL) { | |
+ errno = GetLastError(); | |
+ goto error; | |
+ } | |
+ | |
+ pipeInstance = &handle->connections[i]; | |
+ pipeInstance->handle = pipeHandle; | |
+ pipeInstance->state = UV_PIPEINSTANCE_PENDING; | |
+ } | |
+ | |
+ /* We don't need the pipe name anymore. */ | |
+ free(handle->name); | |
+ handle->name = NULL; | |
+ | |
+ handle->connectionCount = instanceCount; | |
+ handle->flags |= UV_HANDLE_LISTENING; | |
+ handle->connection_cb = cb; | |
+ | |
+ uv_pipe_queue_accept(handle); | |
+ return 0; | |
+ | |
+error: | |
+ close_pipe(handle, NULL, NULL); | |
+ uv_set_sys_error(errno); | |
+ return -1; | |
+} | |
+ | |
+/* TODO: make this work with UTF8 name */ | |
+int uv_pipe_connect(uv_req_t* req, char* name) { | |
+ int errno; | |
+ DWORD mode; | |
+ uv_pipe_t* handle = (uv_pipe_t*)req->handle; | |
+ | |
+ assert(!(req->flags & UV_REQ_PENDING)); | |
+ | |
+ req->type = UV_CONNECT; | |
+ handle->connection = &handle->clientConnection; | |
+ handle->server = NULL; | |
+ memset(&req->overlapped, 0, sizeof(req->overlapped)); | |
+ | |
+ handle->clientConnection.handle = CreateFile(name, | |
+ GENERIC_READ | GENERIC_WRITE, | |
+ 0, | |
+ NULL, | |
+ OPEN_EXISTING, | |
+ FILE_FLAG_OVERLAPPED, | |
+ NULL); | |
+ | |
+ if (handle->clientConnection.handle == INVALID_HANDLE_VALUE && | |
+ GetLastError() != ERROR_IO_PENDING) { | |
+ errno = GetLastError(); | |
+ goto error; | |
+ } | |
+ | |
+ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; | |
+ | |
+ if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) { | |
+ errno = GetLastError(); | |
+ goto error; | |
+ } | |
+ | |
+ if (CreateIoCompletionPort(handle->clientConnection.handle, | |
+ uv_iocp_, | |
+ (ULONG_PTR)handle, | |
+ 0) == NULL) { | |
+ errno = GetLastError(); | |
+ goto error; | |
+ } | |
+ | |
+ req->error = uv_ok_; | |
+ req->flags |= UV_REQ_PENDING; | |
+ handle->connection->state = UV_PIPEINSTANCE_ACTIVE; | |
+ uv_insert_pending_req(req); | |
+ handle->reqs_pending++; | |
+ return 0; | |
+ | |
+error: | |
+ close_pipe(handle, NULL, NULL); | |
+ req->error = uv_new_sys_error(errno); | |
+ uv_insert_pending_req(req); | |
+ handle->reqs_pending++; | |
+ return 0; | |
+} | |
+ | |
+ | |
+/* Cleans up uv_pipe_t (server or connection) and all resources associated with it */ | |
+static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { | |
+ uv_pipe_instance_t* connection; | |
+ int i; | |
+ | |
+ if (handle->flags & UV_HANDLE_PIPESERVER) { | |
+ if (handle->flags & UV_HANDLE_CONNECTION) { | |
+ /* | |
+ * The handle is for a connection instance on the pipe server. | |
+ * To clean-up, we call DisconnectNamedPipe, and return the instance to pending state, | |
+ * which will be ready to accept another pipe connection in uv_pipe_queue_accept. | |
+ */ | |
+ connection = handle->connection; | |
+ if (connection && connection->state != UV_PIPEINSTANCE_PENDING && connection->handle != INVALID_HANDLE_VALUE) { | |
+ /* Disconnect the connection intance and return it to pending state */ | |
+ if (DisconnectNamedPipe(connection->handle)) { | |
+ connection->state = UV_PIPEINSTANCE_PENDING; | |
+ handle->connection = NULL; | |
+ if (status) *status = 0; | |
+ } else { | |
+ if (status) *status = -1; | |
+ if (err) *err = uv_new_sys_error(GetLastError()); | |
+ } | |
+ | |
+ /* Queue accept now that the instance is in pending state. */ | |
+ if (!(handle->server->flags & UV_HANDLE_CLOSING)) { | |
+ uv_pipe_queue_accept(handle->server); | |
+ } | |
+ } | |
+ } else { | |
+ /* | |
+ * The handle is for the pipe server. | |
+ * To clean-up we close every connection instance that was made in uv_pipe_listen. | |
+ */ | |
+ | |
+ if (handle->name) { | |
+ free(handle->name); | |
+ handle->name = NULL; | |
+ } | |
+ | |
+ if (handle->connections) { | |
+ /* Go through the list of connections, and close each one with CloseHandle. */ | |
+ for (i = 0; i < handle->connectionCount; i++) { | |
+ connection = &handle->connections[i]; | |
+ if (connection->state != UV_PIPEINSTANCE_DISCONNECTED && connection->handle != INVALID_HANDLE_VALUE) { | |
+ CloseHandle(connection->handle); | |
+ connection->state = UV_PIPEINSTANCE_DISCONNECTED; | |
+ connection->handle = INVALID_HANDLE_VALUE; | |
+ } | |
+ } | |
+ | |
+ /* Free the connections buffer. */ | |
+ if (handle->connections != handle->connectionsBuffer) { | |
+ free(handle->connections); | |
+ } | |
+ | |
+ handle->connections = NULL; | |
+ } | |
+ | |
+ if (status) *status = 0; | |
+ } | |
+ } else { | |
+ /* | |
+ * The handle is for a connection instance on the pipe client. | |
+ * To clean-up | |
+ */ | |
+ connection = handle->connection; | |
+ if (connection && connection->handle != INVALID_HANDLE_VALUE) { | |
+ if (CloseHandle(connection->handle)) { | |
+ connection->state = UV_PIPEINSTANCE_DISCONNECTED; | |
+ handle->connection = NULL; | |
+ if (status) *status = 0; | |
+ } else { | |
+ if (status) *status = -1; | |
+ if (err) *err = uv_new_sys_error(GetLastError()); | |
+ } | |
+ } | |
+ } | |
+ | |
+ handle->flags |= UV_HANDLE_SHUT; | |
+} | |
diff --git a/test/benchmark-list.h b/test/benchmark-list.h | |
index 6040e90..5e79878 100644 | |
--- a/test/benchmark-list.h | |
+++ b/test/benchmark-list.h | |
@@ -21,25 +21,35 @@ | |
BENCHMARK_DECLARE (sizes) | |
BENCHMARK_DECLARE (ping_pongs) | |
-BENCHMARK_DECLARE (pump100_client) | |
-BENCHMARK_DECLARE (pump1_client) | |
+BENCHMARK_DECLARE (tcp_pump100_client) | |
+BENCHMARK_DECLARE (tcp_pump1_client) | |
+BENCHMARK_DECLARE (pipe_pump100_client) | |
+BENCHMARK_DECLARE (pipe_pump1_client) | |
BENCHMARK_DECLARE (gethostbyname) | |
BENCHMARK_DECLARE (getaddrinfo) | |
-HELPER_DECLARE (pump_server) | |
-HELPER_DECLARE (echo_server) | |
+HELPER_DECLARE (tcp_pump_server) | |
+HELPER_DECLARE (pipe_pump_server) | |
+HELPER_DECLARE (tcp4_echo_server) | |
+HELPER_DECLARE (pipe_echo_server) | |
HELPER_DECLARE (dns_server) | |
TASK_LIST_START | |
BENCHMARK_ENTRY (sizes) | |
BENCHMARK_ENTRY (ping_pongs) | |
- BENCHMARK_HELPER (ping_pongs, echo_server) | |
+ BENCHMARK_HELPER (ping_pongs, tcp4_echo_server) | |
- BENCHMARK_ENTRY (pump100_client) | |
- BENCHMARK_HELPER (pump100_client, pump_server) | |
+ BENCHMARK_ENTRY (tcp_pump100_client) | |
+ BENCHMARK_HELPER (tcp_pump100_client, tcp_pump_server) | |
- BENCHMARK_ENTRY (pump1_client) | |
- BENCHMARK_HELPER (pump1_client, pump_server) | |
+ BENCHMARK_ENTRY (tcp_pump1_client) | |
+ BENCHMARK_HELPER (tcp_pump1_client, tcp_pump_server) | |
+ | |
+ BENCHMARK_ENTRY (pipe_pump100_client) | |
+ BENCHMARK_HELPER (pipe_pump100_client, pipe_pump_server) | |
+ | |
+ BENCHMARK_ENTRY (pipe_pump1_client) | |
+ BENCHMARK_HELPER (pipe_pump1_client, pipe_pump_server) | |
BENCHMARK_ENTRY (gethostbyname) | |
BENCHMARK_HELPER (gethostbyname, dns_server) | |
diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c | |
index cd9c7d9..591dbb0 100644 | |
--- a/test/benchmark-pump.c | |
+++ b/test/benchmark-pump.c | |
@@ -45,7 +45,9 @@ static uv_buf_t buf_alloc(uv_stream_t*, size_t size); | |
static void buf_free(uv_buf_t uv_buf_t); | |
-static uv_tcp_t server; | |
+static uv_tcp_t tcpServer; | |
+static uv_pipe_t pipeServer; | |
+static uv_handle_t* server; | |
static struct sockaddr_in listen_addr; | |
static struct sockaddr_in connect_addr; | |
@@ -68,7 +70,10 @@ static char write_buffer[WRITE_BUFFER_SIZE]; | |
/* Make this as large as you need. */ | |
#define MAX_WRITE_HANDLES 1000 | |
-static uv_tcp_t write_handles[MAX_WRITE_HANDLES]; | |
+static stream_type type; | |
+ | |
+static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES]; | |
+static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES]; | |
static uv_timer_t timer_handle; | |
@@ -81,6 +86,7 @@ static double gbit(int64_t bytes, int64_t passed_ms) { | |
static void show_stats(uv_timer_t* handle, int status) { | |
int64_t diff; | |
+ int i; | |
#if PRINT_STATS | |
LOGF("connections: %d, write: %.1f gbit/s\n", | |
@@ -94,9 +100,13 @@ static void show_stats(uv_timer_t* handle, int status) { | |
uv_update_time(); | |
diff = uv_now() - start_time; | |
- LOGF("pump%d_client: %.1f gbit/s\n", write_sockets, | |
+ LOGF("%s_pump%d_client: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", write_sockets, | |
gbit(nsent_total, diff)); | |
+ for (i = 0; i < write_sockets; i++) { | |
+ uv_close(type == TCP ? (uv_handle_t*)&tcp_write_handles[i] : (uv_handle_t*)&pipe_write_handles[i], NULL); | |
+ } | |
+ | |
exit(0); | |
} | |
@@ -112,7 +122,7 @@ static void read_show_stats() { | |
uv_update_time(); | |
diff = uv_now() - start_time; | |
- LOGF("pump%d_server: %.1f gbit/s\n", max_read_sockets, | |
+ LOGF("%s_pump%d_server: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", max_read_sockets, | |
gbit(nrecv_total, diff)); | |
} | |
@@ -133,7 +143,7 @@ void read_sockets_close_cb(uv_handle_t* handle) { | |
*/ | |
if (uv_now() - start_time > 1000 && read_sockets == 0) { | |
read_show_stats(); | |
- uv_close((uv_handle_t*)&server, NULL); | |
+ uv_close(server, NULL); | |
} | |
} | |
@@ -154,7 +164,7 @@ static void start_stats_collection() { | |
} | |
-static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) { | |
+static void read_cb(uv_stream_t* stream, ssize_t bytes, uv_buf_t buf) { | |
if (nrecv_total == 0) { | |
ASSERT(start_time == 0); | |
uv_update_time(); | |
@@ -162,7 +172,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) { | |
} | |
if (bytes < 0) { | |
- uv_close((uv_handle_t*)tcp, read_sockets_close_cb); | |
+ uv_close((uv_handle_t*)stream, read_sockets_close_cb); | |
return; | |
} | |
@@ -187,7 +197,7 @@ static void write_cb(uv_req_t *req, int status) { | |
} | |
-static void do_write(uv_stream_t* tcp) { | |
+static void do_write(uv_stream_t* stream) { | |
uv_req_t* req; | |
uv_buf_t buf; | |
int r; | |
@@ -195,9 +205,9 @@ static void do_write(uv_stream_t* tcp) { | |
buf.base = (char*) &write_buffer; | |
buf.len = sizeof write_buffer; | |
- while (tcp->write_queue_size == 0) { | |
+ while (stream->write_queue_size == 0) { | |
req = req_alloc(); | |
- uv_req_init(req, (uv_handle_t*)tcp, write_cb); | |
+ uv_req_init(req, (uv_handle_t*)stream, write_cb); | |
r = uv_write(req, &buf, 1); | |
ASSERT(r == 0); | |
@@ -221,7 +231,7 @@ static void connect_cb(uv_req_t* req, int status) { | |
/* Yay! start writing */ | |
for (i = 0; i < write_sockets; i++) { | |
- do_write((uv_stream_t*)&write_handles[i]); | |
+ do_write(type == TCP ? (uv_stream_t*)&tcp_write_handles[i] : (uv_stream_t*)&pipe_write_handles[i]); | |
} | |
} | |
} | |
@@ -230,38 +240,55 @@ static void connect_cb(uv_req_t* req, int status) { | |
static void maybe_connect_some() { | |
uv_req_t* req; | |
uv_tcp_t* tcp; | |
+ uv_pipe_t* pipe; | |
int r; | |
while (max_connect_socket < TARGET_CONNECTIONS && | |
max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) { | |
- tcp = &write_handles[max_connect_socket++]; | |
- | |
- r = uv_tcp_init(tcp); | |
- ASSERT(r == 0); | |
- | |
- req = req_alloc(); | |
- uv_req_init(req, (uv_handle_t*)tcp, connect_cb); | |
- r = uv_tcp_connect(req, connect_addr); | |
- ASSERT(r == 0); | |
+ if (type == TCP) { | |
+ tcp = &tcp_write_handles[max_connect_socket++]; | |
+ | |
+ r = uv_tcp_init(tcp); | |
+ ASSERT(r == 0); | |
+ | |
+ req = req_alloc(); | |
+ uv_req_init(req, (uv_handle_t*)tcp, connect_cb); | |
+ r = uv_tcp_connect(req, connect_addr); | |
+ ASSERT(r == 0); | |
+ } else { | |
+ pipe = &pipe_write_handles[max_connect_socket++]; | |
+ | |
+ r = uv_pipe_init(pipe); | |
+ ASSERT(r == 0); | |
+ | |
+ req = req_alloc(); | |
+ uv_req_init(req, (uv_handle_t*)pipe, connect_cb); | |
+ r = uv_pipe_connect(req, TEST_PIPENAME); | |
+ ASSERT(r == 0); | |
+ } | |
} | |
} | |
static void connection_cb(uv_handle_t* s, int status) { | |
- uv_tcp_t* tcp; | |
+ uv_stream_t* stream; | |
int r; | |
- ASSERT(&server == (uv_tcp_t*)s); | |
+ ASSERT(server == s); | |
ASSERT(status == 0); | |
- tcp = malloc(sizeof(uv_tcp_t)); | |
- | |
- uv_tcp_init(tcp); | |
+ if (type == TCP) { | |
+ stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); | |
+ uv_tcp_init((uv_tcp_t*)stream); | |
+ } else { | |
+ stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t)); | |
+ uv_pipe_init((uv_pipe_t*)stream); | |
+ } | |
- r = uv_accept(s, (uv_stream_t*)tcp); | |
+ r = uv_accept(s, stream); | |
ASSERT(r == 0); | |
- r = uv_read_start((uv_stream_t*)tcp, buf_alloc, read_cb); | |
+ r = uv_read_start(stream, buf_alloc, read_cb); | |
ASSERT(r == 0); | |
read_sockets++; | |
@@ -317,7 +344,7 @@ typedef struct buf_list_s { | |
static buf_list_t* buf_freelist = NULL; | |
-static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) { | |
+static uv_buf_t buf_alloc(uv_stream_t* stream, size_t size) { | |
buf_list_t* buf; | |
buf = buf_freelist; | |
@@ -342,18 +369,20 @@ static void buf_free(uv_buf_t uv_buf_t) { | |
} | |
-HELPER_IMPL(pump_server) { | |
+HELPER_IMPL(tcp_pump_server) { | |
int r; | |
+ type = TCP; | |
uv_init(); | |
listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT); | |
/* Server */ | |
- r = uv_tcp_init(&server); | |
+ server = (uv_handle_t*)&tcpServer; | |
+ r = uv_tcp_init(&tcpServer); | |
ASSERT(r == 0); | |
- r = uv_tcp_bind(&server, listen_addr); | |
+ r = uv_tcp_bind(&tcpServer, listen_addr); | |
ASSERT(r == 0); | |
- r = uv_tcp_listen(&server, MAX_WRITE_HANDLES, connection_cb); | |
+ r = uv_tcp_listen(&tcpServer, MAX_WRITE_HANDLES, connection_cb); | |
ASSERT(r == 0); | |
uv_run(); | |
@@ -362,9 +391,31 @@ HELPER_IMPL(pump_server) { | |
} | |
-void pump(int n) { | |
+HELPER_IMPL(pipe_pump_server) { | |
+ int r; | |
+ type = PIPE; | |
+ | |
+ uv_init(); | |
+ | |
+ /* Server */ | |
+ server = (uv_handle_t*)&pipeServer; | |
+ r = uv_pipe_init(&pipeServer); | |
+ ASSERT(r == 0); | |
+ r = uv_pipe_create(&pipeServer, TEST_PIPENAME); | |
+ ASSERT(r == 0); | |
+ r = uv_pipe_listen(&pipeServer, MAX_WRITE_HANDLES, connection_cb); | |
+ ASSERT(r == 0); | |
+ | |
+ uv_run(); | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+void tcp_pump(int n) { | |
ASSERT(n <= MAX_WRITE_HANDLES); | |
TARGET_CONNECTIONS = n; | |
+ type = TCP; | |
uv_init(); | |
@@ -377,13 +428,39 @@ void pump(int n) { | |
} | |
-BENCHMARK_IMPL(pump100_client) { | |
- pump(100); | |
+void pipe_pump(int n) { | |
+ ASSERT(n <= MAX_WRITE_HANDLES); | |
+ TARGET_CONNECTIONS = n; | |
+ type = PIPE; | |
+ | |
+ uv_init(); | |
+ | |
+ /* Start making connections */ | |
+ maybe_connect_some(); | |
+ | |
+ uv_run(); | |
+} | |
+ | |
+ | |
+BENCHMARK_IMPL(tcp_pump100_client) { | |
+ tcp_pump(100); | |
+ return 0; | |
+} | |
+ | |
+ | |
+BENCHMARK_IMPL(tcp_pump1_client) { | |
+ tcp_pump(1); | |
+ return 0; | |
+} | |
+ | |
+ | |
+BENCHMARK_IMPL(pipe_pump100_client) { | |
+ pipe_pump(100); | |
return 0; | |
} | |
-BENCHMARK_IMPL(pump1_client) { | |
- pump(1); | |
+BENCHMARK_IMPL(pipe_pump1_client) { | |
+ pipe_pump(1); | |
return 0; | |
} | |
diff --git a/test/benchmark-sizes.c b/test/benchmark-sizes.c | |
index 830de3a..9038645 100644 | |
--- a/test/benchmark-sizes.c | |
+++ b/test/benchmark-sizes.c | |
@@ -26,6 +26,7 @@ | |
BENCHMARK_IMPL(sizes) { | |
LOGF("uv_req_t: %u bytes\n", (unsigned int) sizeof(uv_req_t)); | |
LOGF("uv_tcp_t: %u bytes\n", (unsigned int) sizeof(uv_tcp_t)); | |
+ LOGF("uv_pipe_t: %u bytes\n", (unsigned int) sizeof(uv_pipe_t)); | |
LOGF("uv_prepare_t: %u bytes\n", (unsigned int) sizeof(uv_prepare_t)); | |
LOGF("uv_check_t: %u bytes\n", (unsigned int) sizeof(uv_check_t)); | |
LOGF("uv_idle_t: %u bytes\n", (unsigned int) sizeof(uv_idle_t)); | |
diff --git a/test/echo-server.c b/test/echo-server.c | |
index 9addc54..b8c3500 100644 | |
--- a/test/echo-server.c | |
+++ b/test/echo-server.c | |
@@ -24,19 +24,16 @@ | |
#include <stdio.h> | |
#include <stdlib.h> | |
- | |
typedef struct { | |
uv_req_t req; | |
uv_buf_t buf; | |
} write_req_t; | |
- | |
static int server_closed; | |
-static uv_tcp_t server; | |
- | |
-static int server6_closed; | |
-static uv_tcp_t server6; | |
- | |
+static stream_type serverType; | |
+static uv_tcp_t tcpServer; | |
+static uv_pipe_t pipeServer; | |
+static uv_handle_t* server; | |
static void after_write(uv_req_t* req, int status); | |
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); | |
@@ -98,10 +95,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { | |
if (!server_closed) { | |
for (i = 0; i < nread; i++) { | |
if (buf.base[i] == 'Q') { | |
- uv_close((uv_handle_t*)&server, on_server_close); | |
+ uv_close(server, on_server_close); | |
server_closed = 1; | |
- uv_close((uv_handle_t*)&server6, on_server_close); | |
- server6_closed = 1; | |
} | |
} | |
} | |
@@ -131,7 +126,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) { | |
static void on_connection(uv_handle_t* server, int status) { | |
- uv_tcp_t* handle; | |
+ uv_handle_t* handle; | |
int r; | |
if (status != 0) { | |
@@ -139,10 +134,17 @@ static void on_connection(uv_handle_t* server, int status) { | |
} | |
ASSERT(status == 0); | |
- handle = (uv_tcp_t*) malloc(sizeof *handle); | |
- ASSERT(handle != NULL); | |
+ if (serverType == TCP) { | |
+ handle = (uv_handle_t*) malloc(sizeof(uv_tcp_t)); | |
+ ASSERT(handle != NULL); | |
- uv_tcp_init(handle); | |
+ uv_tcp_init((uv_tcp_t*)handle); | |
+ } else { | |
+ handle = (uv_handle_t*) malloc(sizeof(uv_pipe_t)); | |
+ ASSERT(handle != NULL); | |
+ | |
+ uv_pipe_init((uv_pipe_t*)handle); | |
+ } | |
/* associate server with stream */ | |
handle->data = server; | |
@@ -156,37 +158,50 @@ static void on_connection(uv_handle_t* server, int status) { | |
static void on_server_close(uv_handle_t* handle) { | |
- ASSERT(handle == (uv_handle_t*)&server || handle == (uv_handle_t*)&server6); | |
+ ASSERT(handle == server); | |
} | |
-static int echo_start(int port) { | |
+static int tcp4_echo_start(int port) { | |
struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", port); | |
- struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port); | |
int r; | |
- | |
- r = uv_tcp_init(&server); | |
+ | |
+ server = (uv_handle_t*)&tcpServer; | |
+ serverType = TCP; | |
+ | |
+ r = uv_tcp_init(&tcpServer); | |
if (r) { | |
/* TODO: Error codes */ | |
fprintf(stderr, "Socket creation error\n"); | |
return 1; | |
} | |
- r = uv_tcp_bind(&server, addr); | |
+ r = uv_tcp_bind(&tcpServer, addr); | |
if (r) { | |
/* TODO: Error codes */ | |
fprintf(stderr, "Bind error\n"); | |
return 1; | |
} | |
- r = uv_tcp_listen(&server, 128, on_connection); | |
+ r = uv_tcp_listen(&tcpServer, 128, on_connection); | |
if (r) { | |
/* TODO: Error codes */ | |
fprintf(stderr, "Listen error\n"); | |
return 1; | |
} | |
- r = uv_tcp_init(&server6); | |
+ return 0; | |
+} | |
+ | |
+ | |
+static int tcp6_echo_start(int port) { | |
+ struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port); | |
+ int r; | |
+ | |
+ server = (uv_handle_t*)&tcpServer; | |
+ serverType = TCP; | |
+ | |
+ r = uv_tcp_init(&tcpServer); | |
if (r) { | |
/* TODO: Error codes */ | |
fprintf(stderr, "Socket creation error\n"); | |
@@ -194,14 +209,45 @@ static int echo_start(int port) { | |
} | |
/* IPv6 is optional as not all platforms support it */ | |
- r = uv_tcp_bind6(&server6, addr6); | |
+ r = uv_tcp_bind6(&tcpServer, addr6); | |
if (r) { | |
/* show message but return OK */ | |
fprintf(stderr, "IPv6 not supported\n"); | |
return 0; | |
} | |
- r = uv_tcp_listen(&server6, 128, on_connection); | |
+ r = uv_tcp_listen(&tcpServer, 128, on_connection); | |
+ if (r) { | |
+ /* TODO: Error codes */ | |
+ fprintf(stderr, "Listen error\n"); | |
+ return 1; | |
+ } | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+static int pipe_echo_start(char* pipeName) { | |
+ int r; | |
+ | |
+ server = (uv_handle_t*)&pipeServer; | |
+ serverType = PIPE; | |
+ | |
+ r = uv_pipe_init(&pipeServer); | |
+ if (r) { | |
+ /* TODO: Error codes */ | |
+ fprintf(stderr, "Pipe creation error\n"); | |
+ return 1; | |
+ } | |
+ | |
+ r = uv_pipe_create(&pipeServer, pipeName); | |
+ if (r) { | |
+ /* TODO: Error codes */ | |
+ fprintf(stderr, "create error\n"); | |
+ return 1; | |
+ } | |
+ | |
+ r = uv_pipe_listen(&pipeServer, 1, on_connection); | |
if (r) { | |
/* TODO: Error codes */ | |
fprintf(stderr, "Listen error on IPv6\n"); | |
@@ -212,9 +258,30 @@ static int echo_start(int port) { | |
} | |
-HELPER_IMPL(echo_server) { | |
+HELPER_IMPL(tcp4_echo_server) { | |
uv_init(); | |
- if (echo_start(TEST_PORT)) | |
+ if (tcp4_echo_start(TEST_PORT)) | |
+ return 1; | |
+ | |
+ uv_run(); | |
+ return 0; | |
+} | |
+ | |
+ | |
+HELPER_IMPL(tcp6_echo_server) { | |
+ uv_init(); | |
+ if (tcp6_echo_start(TEST_PORT)) | |
+ return 1; | |
+ | |
+ uv_run(); | |
+ return 0; | |
+} | |
+ | |
+ | |
+HELPER_IMPL(pipe_echo_server) { | |
+ uv_init(); | |
+ | |
+ if (pipe_echo_start(TEST_PIPENAME)) | |
return 1; | |
uv_run(); | |
diff --git a/test/task.h b/test/task.h | |
index 8d9a1e8..d47c209 100644 | |
--- a/test/task.h | |
+++ b/test/task.h | |
@@ -30,6 +30,17 @@ | |
#define TEST_PORT 9123 | |
#define TEST_PORT_2 9124 | |
+#ifdef _WIN32 | |
+# define TEST_PIPENAME "\\\\.\\pipe\\uv-test" | |
+#else | |
+# /* TODO: define unix pipe name */ | |
+# define TEST_PIPENAME "" | |
+#endif | |
+ | |
+typedef enum { | |
+ TCP = 0, | |
+ PIPE | |
+} stream_type; | |
/* Log to stderr. */ | |
#define LOG(...) fprintf(stderr, "%s", __VA_ARGS__) | |
diff --git a/test/test-list.h b/test/test-list.h | |
index 190574a..0cee34f 100644 | |
--- a/test/test-list.h | |
+++ b/test/test-list.h | |
@@ -19,8 +19,9 @@ | |
* IN THE SOFTWARE. | |
*/ | |
-TEST_DECLARE (ping_pong) | |
-TEST_DECLARE (ping_pong_v6) | |
+TEST_DECLARE (tcp_ping_pong) | |
+TEST_DECLARE (tcp_ping_pong_v6) | |
+TEST_DECLARE (pipe_ping_pong) | |
TEST_DECLARE (delayed_accept) | |
TEST_DECLARE (tcp_writealot) | |
TEST_DECLARE (bind_error_addrinuse) | |
@@ -54,20 +55,25 @@ TEST_DECLARE (getaddrinfo_concurrent) | |
TEST_DECLARE (gethostbyname) | |
TEST_DECLARE (fail_always) | |
TEST_DECLARE (pass_always) | |
-HELPER_DECLARE (echo_server) | |
+HELPER_DECLARE (tcp4_echo_server) | |
+HELPER_DECLARE (tcp6_echo_server) | |
+HELPER_DECLARE (pipe_echo_server) | |
TASK_LIST_START | |
- TEST_ENTRY (ping_pong) | |
- TEST_HELPER (ping_pong, echo_server) | |
+ TEST_ENTRY (tcp_ping_pong) | |
+ TEST_HELPER (tcp_ping_pong, tcp4_echo_server) | |
- TEST_ENTRY (ping_pong_v6) | |
- TEST_HELPER (ping_pong_v6, echo_server) | |
+ TEST_ENTRY (tcp_ping_pong_v6) | |
+ TEST_HELPER (tcp_ping_pong_v6, tcp6_echo_server) | |
+ | |
+ TEST_ENTRY (pipe_ping_pong) | |
+ TEST_HELPER (pipe_ping_pong, pipe_echo_server) | |
TEST_ENTRY (delayed_accept) | |
TEST_ENTRY (tcp_writealot) | |
- TEST_HELPER (tcp_writealot, echo_server) | |
+ TEST_HELPER (tcp_writealot, tcp4_echo_server) | |
TEST_ENTRY (bind_error_addrinuse) | |
TEST_ENTRY (bind_error_addrnotavail_1) | |
@@ -86,10 +92,10 @@ TASK_LIST_START | |
TEST_ENTRY (connection_fail_doesnt_auto_close) | |
TEST_ENTRY (shutdown_eof) | |
- TEST_HELPER (shutdown_eof, echo_server) | |
+ TEST_HELPER (shutdown_eof, tcp4_echo_server) | |
TEST_ENTRY (callback_stack) | |
- TEST_HELPER (callback_stack, echo_server) | |
+ TEST_HELPER (callback_stack, tcp4_echo_server) | |
TEST_ENTRY (timer) | |
@@ -113,7 +119,7 @@ TASK_LIST_START | |
TEST_ENTRY (getaddrinfo_concurrent) | |
TEST_ENTRY (gethostbyname) | |
- TEST_HELPER (gethostbyname, echo_server) | |
+ TEST_HELPER (gethostbyname, tcp4_echo_server) | |
#if 0 | |
/* These are for testing the test runner. */ | |
diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c | |
index 34de78b..97fe794 100644 | |
--- a/test/test-ping-pong.c | |
+++ b/test/test-ping-pong.c | |
@@ -39,7 +39,10 @@ static char PING[] = "PING\n"; | |
typedef struct { | |
int pongs; | |
int state; | |
- uv_tcp_t tcp; | |
+ union { | |
+ uv_tcp_t tcp; | |
+ uv_pipe_t pipe; | |
+ }; | |
uv_req_t connect_req; | |
uv_req_t read_req; | |
char read_buffer[BUFSIZE]; | |
@@ -93,11 +96,11 @@ static void pinger_write_ping(pinger_t* pinger) { | |
} | |
-static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { | |
+static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) { | |
unsigned int i; | |
pinger_t* pinger; | |
- pinger = (pinger_t*)tcp->data; | |
+ pinger = (pinger_t*)stream->data; | |
if (nread < 0) { | |
ASSERT(uv_last_error().code == UV_EOF); | |
@@ -142,9 +145,10 @@ static void pinger_on_connect(uv_req_t *req, int status) { | |
} | |
-static void pinger_new() { | |
+/* same ping-pong test, but using IPv6 connection */ | |
+static void tcp_pinger_v6_new() { | |
int r; | |
- struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); | |
+ struct sockaddr_in6 server_addr = uv_ip6_addr("::1", TEST_PORT); | |
pinger_t *pinger; | |
pinger = (pinger_t*)malloc(sizeof(*pinger)); | |
@@ -161,27 +165,37 @@ static void pinger_new() { | |
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), | |
(void *(*)(void *))pinger_on_connect); | |
- r = uv_tcp_connect(&pinger->connect_req, server_addr); | |
+ r = uv_tcp_connect6(&pinger->connect_req, server_addr); | |
ASSERT(!r); | |
} | |
-TEST_IMPL(ping_pong) { | |
- uv_init(); | |
+static void tcp_pinger_new() { | |
+ int r; | |
+ struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); | |
+ pinger_t *pinger; | |
- pinger_new(); | |
- uv_run(); | |
+ pinger = (pinger_t*)malloc(sizeof(*pinger)); | |
+ pinger->state = 0; | |
+ pinger->pongs = 0; | |
- ASSERT(completed_pingers == 1); | |
+ /* Try to connec to the server and do NUM_PINGS ping-pongs. */ | |
+ r = uv_tcp_init(&pinger->tcp); | |
+ pinger->tcp.data = pinger; | |
+ ASSERT(!r); | |
- return 0; | |
+ /* We are never doing multiple reads/connects at a time anyway. */ | |
+ /* so these handles can be pre-initialized. */ | |
+ uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), | |
+ pinger_on_connect); | |
+ | |
+ r = uv_tcp_connect(&pinger->connect_req, server_addr); | |
+ ASSERT(!r); | |
} | |
-/* same ping-pong test, but using IPv6 connection */ | |
-static void pinger_v6_new() { | |
+static void pipe_pinger_new() { | |
int r; | |
- struct sockaddr_in6 server_addr = uv_ip6_addr("::1", TEST_PORT); | |
pinger_t *pinger; | |
pinger = (pinger_t*)malloc(sizeof(*pinger)); | |
@@ -189,24 +203,48 @@ static void pinger_v6_new() { | |
pinger->pongs = 0; | |
/* Try to connec to the server and do NUM_PINGS ping-pongs. */ | |
- r = uv_tcp_init(&pinger->tcp); | |
- pinger->tcp.data = pinger; | |
+ r = uv_pipe_init(&pinger->pipe); | |
+ pinger->pipe.data = pinger; | |
ASSERT(!r); | |
/* We are never doing multiple reads/connects at a time anyway. */ | |
/* so these handles can be pre-initialized. */ | |
- uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), | |
+ uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe), | |
(void *(*)(void *))pinger_on_connect); | |
- r = uv_tcp_connect6(&pinger->connect_req, server_addr); | |
+ r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME); | |
ASSERT(!r); | |
} | |
-TEST_IMPL(ping_pong_v6) { | |
+TEST_IMPL(tcp_ping_pong) { | |
+ uv_init(); | |
+ | |
+ tcp_pinger_new(); | |
+ uv_run(); | |
+ | |
+ ASSERT(completed_pingers == 1); | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+TEST_IMPL(tcp_ping_pong_v6) { | |
+ uv_init(); | |
+ | |
+ tcp_pinger_v6_new(); | |
+ uv_run(); | |
+ | |
+ ASSERT(completed_pingers == 1); | |
+ | |
+ return 0; | |
+} | |
+ | |
+ | |
+TEST_IMPL(pipe_ping_pong) { | |
uv_init(); | |
- pinger_v6_new(); | |
+ pipe_pinger_new(); | |
uv_run(); | |
ASSERT(completed_pingers == 1); | |
-- | |
1.7.4.msysgit.0 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment