Skip to content

Instantly share code, notes, and snippets.

@igorzi
Created July 9, 2011 07:26
Show Gist options
  • Save igorzi/1073416 to your computer and use it in GitHub Desktop.
Save igorzi/1073416 to your computer and use it in GitHub Desktop.
pipes
From 4b9eaed4760c1f25d41bf8431b53bbf7f085b787 Mon Sep 17 00:00:00 2001
From: Igor Zinkovsky <igorzi@microsoft.com>
Date: Fri, 1 Jul 2011 17:54:17 -0700
Subject: [PATCH] Named pipes implementation for Windows
---
include/uv-unix.h | 5 +
include/uv-win.h | 64 ++++-
include/uv.h | 26 ++-
src/uv-unix.c | 19 ++
src/uv-win.c | 748 ++++++++++++++++++++++++++++++++++++++++++++++--
test/benchmark-list.h | 28 ++-
test/benchmark-pump.c | 151 ++++++++---
test/benchmark-sizes.c | 1 +
test/echo-server.c | 119 ++++++--
test/task.h | 11 +
test/test-list.h | 28 ++-
test/test-ping-pong.c | 80 ++++--
12 files changed, 1135 insertions(+), 145 deletions(-)
diff --git a/include/uv-unix.h b/include/uv-unix.h
index f68a7fd..38ecca9 100644
--- a/include/uv-unix.h
+++ b/include/uv-unix.h
@@ -73,6 +73,11 @@ typedef struct {
ev_io write_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue;
+
+
+/* UV_NAMED_PIPE */
+#define UV_PIPE_PRIVATE_TYPEDEF \
+#define UV_PIPE_PRIVATE_FIELDS \
/* UV_PREPARE */ \
diff --git a/include/uv-win.h b/include/uv-win.h
index e6254fe..ee29e8c 100644
--- a/include/uv-win.h
+++ b/include/uv-win.h
@@ -31,6 +31,8 @@
#include "tree.h"
+#define MAX_PIPENAME_LEN 256
+
/**
* It should be possible to cast uv_buf_t[] to WSABUF[]
* see http://msdn.microsoft.com/en-us/library/ms741542(v=vs.85).aspx
@@ -40,6 +42,17 @@ typedef struct uv_buf_t {
char* base;
} uv_buf_t;
+/*
+ * Private uv_pipe_instance state.
+ */
+typedef enum {
+ UV_PIPEINSTANCE_DISCONNECTED = 0,
+ UV_PIPEINSTANCE_PENDING,
+ UV_PIPEINSTANCE_WAITING,
+ UV_PIPEINSTANCE_ACCEPTED,
+ UV_PIPEINSTANCE_ACTIVE
+} uv_pipeinstance_state;
+
#define UV_REQ_PRIVATE_FIELDS \
union { \
/* Used by I/O operations */ \
@@ -51,31 +64,56 @@ typedef struct uv_buf_t {
int flags; \
uv_err_t error; \
struct uv_req_s* next_req;
+
+#define uv_stream_connection_fields \
+ unsigned int write_reqs_pending; \
+ uv_req_t* shutdown_req;
+
+#define uv_stream_server_fields \
+ uv_connection_cb connection_cb;
#define UV_STREAM_PRIVATE_FIELDS \
+ unsigned int reqs_pending; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
struct uv_req_s read_req; \
-
-#define uv_tcp_connection_fields \
- unsigned int write_reqs_pending; \
- uv_req_t* shutdown_req;
-
-#define uv_tcp_server_fields \
- uv_connection_cb connection_cb; \
- SOCKET accept_socket; \
- struct uv_req_s accept_req; \
- char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
+ union { \
+ struct { uv_stream_connection_fields }; \
+ struct { uv_stream_server_fields }; \
+ };
#define UV_TCP_PRIVATE_FIELDS \
- unsigned int reqs_pending; \
union { \
SOCKET socket; \
HANDLE handle; \
}; \
+ SOCKET accept_socket; \
+ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
+ struct uv_req_s accept_req;
+
+#define uv_pipe_server_fields \
+ char* name; \
+ int connectionCount; \
+ uv_pipe_instance_t* connections; \
+ uv_pipe_instance_t* acceptConnection; \
+ uv_pipe_instance_t connectionsBuffer[4];
+
+#define uv_pipe_connection_fields \
+ uv_pipe_t* server; \
+ uv_pipe_instance_t* connection; \
+ uv_pipe_instance_t clientConnection;
+
+#define UV_PIPE_PRIVATE_TYPEDEF \
+ typedef struct uv_pipe_instance_s { \
+ HANDLE handle; \
+ uv_pipeinstance_state state; \
+ uv_req_t accept_req; \
+ } uv_pipe_instance_t;
+
+#define UV_PIPE_PRIVATE_FIELDS \
union { \
- struct { uv_tcp_connection_fields }; \
- struct { uv_tcp_server_fields }; \
+ struct { uv_pipe_server_fields }; \
+ struct { uv_pipe_connection_fields }; \
};
#define UV_TIMER_PRIVATE_FIELDS \
diff --git a/include/uv.h b/include/uv.h
index 6aecb28..5e0c346 100644
--- a/include/uv.h
+++ b/include/uv.h
@@ -43,6 +43,7 @@ typedef struct uv_err_s uv_err_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
+typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
@@ -124,7 +125,8 @@ typedef enum {
UV_EAIFAMNOSUPPORT,
UV_EAINONAME,
UV_EAISERVICE,
- UV_EAISOCKTYPE
+ UV_EAISOCKTYPE,
+ UV_ESHUTDOWN
} uv_err_code;
typedef enum {
@@ -288,6 +290,26 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
/*
+ * A subclass of uv_stream_t representing a pipe stream or pipe server.
+ */
+UV_PIPE_PRIVATE_TYPEDEF
+
+struct uv_pipe_s {
+ UV_HANDLE_FIELDS
+ UV_STREAM_FIELDS
+ UV_PIPE_PRIVATE_FIELDS
+};
+
+int uv_pipe_init(uv_pipe_t* handle);
+
+int uv_pipe_create(uv_pipe_t* handle, char* name);
+
+int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb);
+
+int uv_pipe_connect(uv_req_t* req, char* name);
+
+
+/*
* Subclass of uv_handle_t. libev wrapper. Every active prepare handle gets
* its callback called exactly once per loop iteration, just before the
* system blocks to wait for completed i/o.
@@ -478,7 +500,9 @@ union uv_any_handle {
typedef struct {
uint64_t req_init;
uint64_t handle_init;
+ uint64_t stream_init;
uint64_t tcp_init;
+ uint64_t pipe_init;
uint64_t prepare_init;
uint64_t check_init;
uint64_t idle_init;
diff --git a/src/uv-unix.c b/src/uv-unix.c
index 786657b..6bffaf2 100644
--- a/src/uv-unix.c
+++ b/src/uv-unix.c
@@ -1594,3 +1594,22 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle,
return 0;
}
+
+int uv_pipe_init(uv_pipe_t* handle) {
+ assert(0 && "implement me");
+}
+
+
+int uv_pipe_create(uv_pipe_t* handle, char* name) {
+ assert(0 && "implement me");
+}
+
+
+int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
+ assert(0 && "implement me");
+}
+
+
+int uv_pipe_connect(uv_req_t* req, char* name) {
+ assert(0 && "implement me");
+}
diff --git a/src/uv-win.c b/src/uv-win.c
index b92eb36..a27bc92 100644
--- a/src/uv-win.c
+++ b/src/uv-win.c
@@ -164,6 +164,7 @@ static LPFN_TRANSMITFILE pTransmitFile6;
#define UV_HANDLE_ENDGAME_QUEUED 0x0400
#define UV_HANDLE_BIND_ERROR 0x1000
#define UV_HANDLE_IPV6 0x2000
+#define UV_HANDLE_PIPESERVER 0x4000
/*
* Private uv_req flags.
@@ -242,6 +243,8 @@ void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req);
void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req);
void uv_ares_poll(uv_timer_t* handle, int status);
+static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err);
+
/* memory used per ares_channel */
struct uv_ares_channel_s {
ares_channel channel;
@@ -373,6 +376,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
case ERROR_INVALID_FLAGS: return UV_EBADF;
case ERROR_INVALID_PARAMETER: return UV_EINVAL;
case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET;
+ case ERROR_BROKEN_PIPE: return UV_EOF;
default: return UV_UNKNOWN;
}
}
@@ -527,6 +531,11 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) {
}
+static uv_pipe_instance_t* uv_req_to_pipeinstance(uv_req_t* req) {
+ return CONTAINING_RECORD(req, uv_pipe_instance_t, accept_req);
+}
+
+
static void uv_insert_pending_req(uv_req_t* req) {
req->next_req = NULL;
if (uv_pending_reqs_tail_) {
@@ -594,24 +603,20 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
}
-static void uv_tcp_init_connection(uv_tcp_t* handle) {
+static void uv_init_connection(uv_stream_t* handle) {
handle->flags |= UV_HANDLE_CONNECTION;
handle->write_reqs_pending = 0;
uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL);
}
-int uv_tcp_init(uv_tcp_t* handle) {
- handle->socket = INVALID_SOCKET;
+int uv_stream_init(uv_stream_t* handle) {
handle->write_queue_size = 0;
- handle->type = UV_TCP;
handle->flags = 0;
- handle->reqs_pending = 0;
handle->error = uv_ok_;
- handle->accept_socket = INVALID_SOCKET;
uv_counters()->handle_init++;
- uv_counters()->tcp_init++;
+ uv_counters()->stream_init++;
uv_refs_++;
@@ -619,6 +624,20 @@ int uv_tcp_init(uv_tcp_t* handle) {
}
+int uv_tcp_init(uv_tcp_t* handle) {
+ uv_stream_init((uv_stream_t*)handle);
+
+ handle->socket = INVALID_SOCKET;
+ handle->type = UV_TCP;
+ handle->reqs_pending = 0;
+ handle->accept_socket = INVALID_SOCKET;
+
+ uv_counters()->tcp_init++;
+
+ return 0;
+}
+
+
static void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;
@@ -658,6 +677,39 @@ static void uv_tcp_endgame(uv_tcp_t* handle) {
}
+static void uv_pipe_endgame(uv_pipe_t* handle) {
+ uv_err_t err;
+ int status;
+
+ if (handle->flags & UV_HANDLE_SHUTTING &&
+ !(handle->flags & UV_HANDLE_SHUT) &&
+ handle->write_reqs_pending == 0) {
+ close_pipe(handle, &status, &err);
+
+ if (handle->shutdown_req->cb) {
+ handle->shutdown_req->flags &= ~UV_REQ_PENDING;
+ if (status == -1) {
+ uv_last_error_ = err;
+ }
+ ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status);
+ }
+ handle->reqs_pending--;
+ }
+
+ if (handle->flags & UV_HANDLE_CLOSING &&
+ handle->reqs_pending == 0) {
+ assert(!(handle->flags & UV_HANDLE_CLOSED));
+ handle->flags |= UV_HANDLE_CLOSED;
+
+ if (handle->close_cb) {
+ handle->close_cb((uv_handle_t*)handle);
+ }
+
+ uv_refs_--;
+ }
+}
+
+
static void uv_timer_endgame(uv_timer_t* handle) {
if (handle->flags & UV_HANDLE_CLOSING) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
@@ -714,6 +766,10 @@ static void uv_process_endgames() {
case UV_TCP:
uv_tcp_endgame((uv_tcp_t*)handle);
break;
+
+ case UV_NAMED_PIPE:
+ uv_pipe_endgame((uv_pipe_t*)handle);
+ break;
case UV_TIMER:
uv_timer_endgame((uv_timer_t*)handle);
@@ -749,6 +805,7 @@ static void uv_want_endgame(uv_handle_t* handle) {
static int uv_close_error(uv_handle_t* handle, uv_err_t e) {
uv_tcp_t* tcp;
+ uv_pipe_t* pipe;
if (handle->flags & UV_HANDLE_CLOSING) {
return 0;
@@ -767,6 +824,15 @@ static int uv_close_error(uv_handle_t* handle, uv_err_t e) {
uv_want_endgame(handle);
}
return 0;
+
+ case UV_NAMED_PIPE:
+ pipe = (uv_pipe_t*)handle;
+ pipe->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
+ close_pipe(pipe, NULL, NULL);
+ if (pipe->reqs_pending == 0) {
+ uv_want_endgame(handle);
+ }
+ return 0;
case UV_TIMER:
uv_timer_stop((uv_timer_t*)handle);
@@ -871,7 +937,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
-static void uv_queue_accept(uv_tcp_t* handle) {
+static void uv_tcp_queue_accept(uv_tcp_t* handle) {
uv_req_t* req;
BOOL success;
DWORD bytes;
@@ -902,6 +968,7 @@ static void uv_queue_accept(uv_tcp_t* handle) {
if (accept_socket == INVALID_SOCKET) {
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
+ handle->reqs_pending++;
return;
}
@@ -921,6 +988,7 @@ static void uv_queue_accept(uv_tcp_t* handle) {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
+ handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
return;
@@ -933,7 +1001,48 @@ static void uv_queue_accept(uv_tcp_t* handle) {
}
-static void uv_queue_read(uv_tcp_t* handle) {
+static void uv_pipe_queue_accept(uv_pipe_t* handle) {
+ uv_req_t* req;
+ uv_pipe_instance_t* instance;
+ int i;
+
+ assert(handle->flags & UV_HANDLE_LISTENING);
+
+ /* This loop goes through every pipe instance and calls ConnectNamedPipe for every pending instance.
+ * TODO: Make this faster (we could maintain a linked list of pending instances).
+ */
+ for (i = 0; i < handle->connectionCount; i++) {
+ instance = &handle->connections[i];
+
+ if (instance->state == UV_PIPEINSTANCE_PENDING) {
+ /* Prepare the uv_req structure. */
+ req = &instance->accept_req;
+ uv_req_init(req, (uv_handle_t*)handle, NULL);
+ assert(!(req->flags & UV_REQ_PENDING));
+ req->type = UV_ACCEPT;
+ req->flags |= UV_REQ_PENDING;
+
+ /* Prepare the overlapped structure. */
+ memset(&(req->overlapped), 0, sizeof(req->overlapped));
+
+ if (!ConnectNamedPipe(instance->handle, &req->overlapped) &&
+ GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
+ /* Make this req pending reporting an error. */
+ req->error = uv_new_sys_error(GetLastError());
+ uv_insert_pending_req(req);
+ handle->reqs_pending++;
+ continue;
+ }
+
+ instance->state = UV_PIPEINSTANCE_WAITING;
+ handle->reqs_pending++;
+ req->flags |= UV_REQ_PENDING;
+ }
+ }
+}
+
+
+static void uv_tcp_queue_read(uv_tcp_t* handle) {
uv_req_t* req;
uv_buf_t buf;
int result;
@@ -961,6 +1070,40 @@ static void uv_queue_read(uv_tcp_t* handle) {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
+ handle->reqs_pending++;
+ return;
+ }
+
+ req->flags |= UV_REQ_PENDING;
+ handle->reqs_pending++;
+}
+
+
+static void uv_pipe_queue_read(uv_pipe_t* handle) {
+ uv_req_t* req;
+ int result;
+
+ assert(handle->flags & UV_HANDLE_READING);
+ assert(handle->connection);
+ assert(handle->connection->handle != INVALID_HANDLE_VALUE);
+
+ req = &handle->read_req;
+ assert(!(req->flags & UV_REQ_PENDING));
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
+ req->type = UV_READ;
+
+ /* Do 0-read */
+ result = ReadFile(handle->connection->handle,
+ &uv_zero_,
+ 0,
+ NULL,
+ &req->overlapped);
+
+ if (!result && GetLastError() != ERROR_IO_PENDING) {
+ /* Make this req pending reporting an error. */
+ req->error = uv_new_sys_error(WSAGetLastError());
+ uv_insert_pending_req(req);
+ handle->reqs_pending++;
return;
}
@@ -993,40 +1136,76 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
handle->connection_cb = cb;
uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL);
- uv_queue_accept(handle);
+ uv_tcp_queue_accept(handle);
return 0;
}
-int uv_accept(uv_handle_t* server, uv_stream_t* client) {
+static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
int rv = 0;
- uv_tcp_t* tcpServer = (uv_tcp_t*)server;
- uv_tcp_t* tcpClient = (uv_tcp_t*)client;
- if (tcpServer->accept_socket == INVALID_SOCKET) {
+ if (server->accept_socket == INVALID_SOCKET) {
uv_set_sys_error(WSAENOTCONN);
return -1;
}
- if (uv_tcp_set_socket(tcpClient, tcpServer->accept_socket) == -1) {
- closesocket(tcpServer->accept_socket);
+ if (uv_tcp_set_socket(client, server->accept_socket) == -1) {
+ closesocket(server->accept_socket);
rv = -1;
} else {
- uv_tcp_init_connection(tcpClient);
+ uv_init_connection((uv_stream_t*)client);
}
- tcpServer->accept_socket = INVALID_SOCKET;
+ server->accept_socket = INVALID_SOCKET;
- if (!(tcpServer->flags & UV_HANDLE_CLOSING)) {
- uv_queue_accept(tcpServer);
+ if (!(server->flags & UV_HANDLE_CLOSING)) {
+ uv_tcp_queue_accept(server);
}
return rv;
}
-int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
+static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
+ assert(server->acceptConnection);
+
+ /* Make the connection instance active */
+ server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE;
+
+ /* Move the connection instance from server to client */
+ client->connection = server->acceptConnection;
+ server->acceptConnection = NULL;
+
+ /* Remember the server */
+ client->server = server;
+
+ uv_init_connection((uv_stream_t*)client);
+ client->flags |= UV_HANDLE_PIPESERVER;
+ uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL);
+
+ if (!(server->flags & UV_HANDLE_CLOSING)) {
+ uv_pipe_queue_accept(server);
+ }
+
+ return 0;
+}
+
+
+int uv_accept(uv_handle_t* server, uv_stream_t* client) {
+ assert(client->type == server->type);
+
+ if (server->type == UV_TCP) {
+ return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client);
+ } else if (server->type == UV_NAMED_PIPE) {
+ return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client);
+ }
+
+ return -1;
+}
+
+
+static int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(WSAEINVAL);
return -1;
@@ -1049,12 +1228,52 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb)
/* If reading was stopped and then started again, there could stell be a */
/* read request pending. */
if (!(handle->read_req.flags & UV_REQ_PENDING))
- uv_queue_read((uv_tcp_t*)handle);
+ uv_tcp_queue_read(handle);
return 0;
}
+static int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
+ if (!(handle->flags & UV_HANDLE_CONNECTION)) {
+ uv_set_sys_error(UV_EINVAL);
+ return -1;
+ }
+
+ if (handle->flags & UV_HANDLE_READING) {
+ uv_set_sys_error(UV_EALREADY);
+ return -1;
+ }
+
+ if (handle->flags & UV_HANDLE_EOF) {
+ uv_set_sys_error(UV_EOF);
+ return -1;
+ }
+
+ handle->flags |= UV_HANDLE_READING;
+ handle->read_cb = read_cb;
+ handle->alloc_cb = alloc_cb;
+
+ /* If reading was stopped and then started again, there could stell be a */
+ /* read request pending. */
+ if (!(handle->read_req.flags & UV_REQ_PENDING))
+ uv_pipe_queue_read(handle);
+
+ return 0;
+}
+
+
+int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
+ if (handle->type == UV_TCP) {
+ return uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb);
+ } else if (handle->type == UV_NAMED_PIPE) {
+ return uv_pipe_read_start((uv_pipe_t*)handle, alloc_cb, read_cb);
+ }
+
+ return -1;
+}
+
+
int uv_read_stop(uv_stream_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
@@ -1169,7 +1388,7 @@ static size_t uv_count_bufs(uv_buf_t bufs[], int count) {
}
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
+int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
int result;
DWORD bytes, err;
uv_tcp_t* handle = (uv_tcp_t*) req->handle;
@@ -1222,6 +1441,72 @@ int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
}
+int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
+ int result;
+ uv_pipe_t* handle = (uv_pipe_t*) req->handle;
+
+ assert(!(req->flags & UV_REQ_PENDING));
+
+ if (bufcnt != 1) {
+ uv_set_sys_error(UV_ENOTSUP);
+ return -1;
+ }
+
+ assert(handle->connection);
+ assert(handle->connection->handle != INVALID_HANDLE_VALUE);
+
+ if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
+ uv_set_sys_error(UV_EINVAL);
+ return -1;
+ }
+
+ if (req->handle->flags & UV_HANDLE_SHUTTING) {
+ uv_set_sys_error(UV_EOF);
+ return -1;
+ }
+
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
+ req->type = UV_WRITE;
+
+ result = WriteFile(handle->connection->handle,
+ bufs[0].base,
+ bufs[0].len,
+ NULL,
+ &req->overlapped);
+
+ if (!result && GetLastError() != WSA_IO_PENDING) {
+ uv_set_sys_error(GetLastError());
+ return -1;
+ }
+
+ if (result) {
+ /* Request completed immediately. */
+ req->queued_bytes = 0;
+ } else {
+ /* Request queued by the kernel. */
+ req->queued_bytes = uv_count_bufs(bufs, bufcnt);
+ handle->write_queue_size += req->queued_bytes;
+ }
+
+ req->flags |= UV_REQ_PENDING;
+ handle->reqs_pending++;
+ handle->write_reqs_pending++;
+
+ return 0;
+}
+
+
+int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
+ if (req->handle->type == UV_TCP) {
+ return uv_tcp_write(req, bufs, bufcnt);
+ } else if (req->handle->type == UV_NAMED_PIPE) {
+ return uv_pipe_write(req, bufs, bufcnt);
+ }
+
+ return -1;
+}
+
+
int uv_shutdown(uv_req_t* req) {
uv_tcp_t* handle = (uv_tcp_t*) req->handle;
int status = 0;
@@ -1240,7 +1525,7 @@ int uv_shutdown(uv_req_t* req) {
req->flags |= UV_REQ_PENDING;
handle->flags |= UV_HANDLE_SHUTTING;
- handle->shutdown_req = req;
+ handle->shutdown_req = req;
handle->reqs_pending++;
uv_want_endgame((uv_handle_t*)handle);
@@ -1332,7 +1617,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
}
/* Post another 0-read if still reading and not closing. */
if (handle->flags & UV_HANDLE_READING) {
- uv_queue_read(handle);
+ uv_tcp_queue_read(handle);
}
break;
@@ -1369,7 +1654,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
/* uv_queue_accept will detect it. */
closesocket(handle->accept_socket);
if (handle->flags & UV_HANDLE_LISTENING) {
- uv_queue_accept(handle);
+ uv_tcp_queue_accept(handle);
}
}
break;
@@ -1382,7 +1667,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
- uv_tcp_init_connection(handle);
+ uv_init_connection((uv_stream_t*)handle);
((uv_connect_cb)req->cb)(req, 0);
} else {
uv_set_sys_error(WSAGetLastError());
@@ -1411,6 +1696,161 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
}
+static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) {
+ DWORD bytes, err, mode;
+ uv_buf_t buf;
+ uv_pipe_instance_t* acceptingConn;
+
+ assert(handle->type == UV_NAMED_PIPE);
+
+ /* Mark the request non-pending */
+ req->flags &= ~UV_REQ_PENDING;
+
+ switch (req->type) {
+ case UV_WRITE:
+ handle->write_queue_size -= req->queued_bytes;
+ if (req->cb) {
+ uv_last_error_ = req->error;
+ ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1);
+ }
+ handle->write_reqs_pending--;
+ if (handle->write_reqs_pending == 0 &&
+ handle->flags & UV_HANDLE_SHUTTING) {
+ uv_want_endgame((uv_handle_t*)handle);
+ }
+ break;
+
+ case UV_READ:
+ if (req->error.code != UV_OK) {
+ /* An error occurred doing the 0-read. */
+ if (!(handle->flags & UV_HANDLE_READING)) {
+ break;
+ }
+
+ /* Stop reading and report error. */
+ handle->flags &= ~UV_HANDLE_READING;
+ uv_last_error_ = req->error;
+ buf.base = 0;
+ buf.len = 0;
+ handle->read_cb((uv_stream_t*)handle, -1, buf);
+ break;
+ }
+
+ /* Temporarily switch to non-blocking mode.
+ * This is so that ReadFile doesn't block if the read buffer is empty.
+ */
+ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT;
+ if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
+ /* We can't continue processing this read. */
+ err = GetLastError();
+ uv_set_sys_error(err);
+ handle->read_cb((uv_stream_t*)handle, -1, buf);
+ break;
+ }
+
+ /* Do non-blocking reads until the buffer is empty */
+ while (handle->flags & UV_HANDLE_READING) {
+ buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
+ assert(buf.len > 0);
+
+ if (ReadFile(handle->connection->handle,
+ buf.base,
+ buf.len,
+ &bytes,
+ NULL)) {
+ if (bytes > 0) {
+ /* Successful read */
+ handle->read_cb((uv_stream_t*)handle, bytes, buf);
+ /* Read again only if bytes == buf.len */
+ if (bytes < buf.len) {
+ break;
+ }
+ } else {
+ /* Connection closed */
+ handle->flags &= ~UV_HANDLE_READING;
+ handle->flags |= UV_HANDLE_EOF;
+ uv_last_error_.code = UV_EOF;
+ uv_last_error_.sys_errno_ = ERROR_SUCCESS;
+ handle->read_cb((uv_stream_t*)handle, -1, buf);
+ break;
+ }
+ } else {
+ err = GetLastError();
+ if (err == ERROR_NO_DATA) {
+ /* Read buffer was completely empty, report a 0-byte read. */
+ uv_set_sys_error(UV_EAGAIN);
+ handle->read_cb((uv_stream_t*)handle, 0, buf);
+ } else {
+ /* Ouch! serious error. */
+ uv_set_sys_error(err);
+ handle->read_cb((uv_stream_t*)handle, -1, buf);
+ }
+ break;
+ }
+ }
+
+ if (handle->flags & UV_HANDLE_READING) {
+ /* Switch back to blocking mode so that we can use IOCP for 0-reads */
+ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
+ if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
+ /* Report and continue. */
+ err = GetLastError();
+ uv_set_sys_error(err);
+ handle->read_cb((uv_stream_t*)handle, -1, buf);
+ break;
+ }
+
+ /* Post another 0-read if still reading and not closing. */
+ uv_pipe_queue_read(handle);
+ }
+
+ break;
+
+ case UV_ACCEPT:
+ if (req->error.code == UV_OK) {
+ /* Put the connection instance into accept state */
+ handle->acceptConnection = uv_req_to_pipeinstance(req);
+ handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED;
+
+ if (handle->connection_cb) {
+ handle->connection_cb((uv_handle_t*)handle, 0);
+ }
+ } else {
+ /* Ignore errors and continue listening */
+ if (handle->flags & UV_HANDLE_LISTENING) {
+ uv_pipe_queue_accept(handle);
+ }
+ }
+ break;
+
+ case UV_CONNECT:
+ if (req->cb) {
+ if (req->error.code == UV_OK) {
+ uv_init_connection((uv_stream_t*)handle);
+ ((uv_connect_cb)req->cb)(req, 0);
+ } else {
+ uv_last_error_ = req->error;
+ ((uv_connect_cb)req->cb)(req, -1);
+ }
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+
+ /* The number of pending requests is now down by one */
+ handle->reqs_pending--;
+
+ /* Queue the handle's close callback if it is closing and there are no */
+ /* more pending requests. */
+ if (handle->flags & UV_HANDLE_CLOSING &&
+ handle->reqs_pending == 0) {
+ uv_want_endgame((uv_handle_t*)handle);
+ }
+}
+
+
static int uv_timer_compare(uv_timer_t* a, uv_timer_t* b) {
if (a->due < b->due)
return -1;
@@ -1708,6 +2148,10 @@ static void uv_process_reqs() {
case UV_TCP:
uv_tcp_return_req((uv_tcp_t*)handle, req);
break;
+
+ case UV_NAMED_PIPE:
+ uv_pipe_return_req((uv_pipe_t*)handle, req);
+ break;
case UV_ASYNC:
uv_async_return_req((uv_async_t*)handle, req);
@@ -2454,3 +2898,253 @@ error:
return -1;
}
+
+int uv_pipe_init(uv_pipe_t* handle) {
+ uv_stream_init((uv_stream_t*)handle);
+
+ handle->type = UV_NAMED_PIPE;
+ handle->reqs_pending = 0;
+
+ uv_counters()->pipe_init++;
+
+ return 0;
+}
+
+
+/* Creates a pipe server. */
+/* TODO: make this work with UTF8 name */
+int uv_pipe_create(uv_pipe_t* handle, char* name) {
+ if (!name) {
+ return -1;
+ }
+
+ handle->connections = NULL;
+ handle->acceptConnection = NULL;
+ handle->connectionCount = 0;
+
+ /* Make our own copy of the pipe name */
+ handle->name = (char*)malloc(MAX_PIPENAME_LEN);
+ if (!handle->name) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+ strcpy(handle->name, name);
+ handle->name[255] = '\0';
+
+ handle->flags |= UV_HANDLE_PIPESERVER;
+ return 0;
+}
+
+
+/* Starts listening for connections for the given pipe. */
+int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
+ int i, maxInstances, errno;
+ HANDLE pipeHandle;
+ uv_pipe_instance_t* pipeInstance;
+
+ if (handle->flags & UV_HANDLE_LISTENING ||
+ handle->flags & UV_HANDLE_READING) {
+ uv_set_sys_error(UV_EALREADY);
+ return -1;
+ }
+
+ if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
+ uv_set_sys_error(UV_ENOTSUP);
+ return -1;
+ }
+
+ if (instanceCount <= sizeof(handle->connectionsBuffer)) {
+ /* Use preallocated connections buffer */
+ handle->connections = handle->connectionsBuffer;
+ } else {
+ handle->connections = (uv_pipe_instance_t*)malloc(instanceCount * sizeof(uv_pipe_instance_t));
+ if (!handle->connections) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+ }
+
+ maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount;
+
+ for (i = 0; i < instanceCount; i++) {
+ pipeHandle = CreateNamedPipe(handle->name,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ maxInstances,
+ 65536,
+ 65536,
+ 0,
+ NULL);
+
+ if (pipeHandle == INVALID_HANDLE_VALUE) {
+ errno = GetLastError();
+ goto error;
+ }
+
+ if (CreateIoCompletionPort(pipeHandle,
+ uv_iocp_,
+ (ULONG_PTR)handle,
+ 0) == NULL) {
+ errno = GetLastError();
+ goto error;
+ }
+
+ pipeInstance = &handle->connections[i];
+ pipeInstance->handle = pipeHandle;
+ pipeInstance->state = UV_PIPEINSTANCE_PENDING;
+ }
+
+ /* We don't need the pipe name anymore. */
+ free(handle->name);
+ handle->name = NULL;
+
+ handle->connectionCount = instanceCount;
+ handle->flags |= UV_HANDLE_LISTENING;
+ handle->connection_cb = cb;
+
+ uv_pipe_queue_accept(handle);
+ return 0;
+
+error:
+ close_pipe(handle, NULL, NULL);
+ uv_set_sys_error(errno);
+ return -1;
+}
+
+/* TODO: make this work with UTF8 name */
+int uv_pipe_connect(uv_req_t* req, char* name) {
+ int errno;
+ DWORD mode;
+ uv_pipe_t* handle = (uv_pipe_t*)req->handle;
+
+ assert(!(req->flags & UV_REQ_PENDING));
+
+ req->type = UV_CONNECT;
+ handle->connection = &handle->clientConnection;
+ handle->server = NULL;
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
+
+ handle->clientConnection.handle = CreateFile(name,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (handle->clientConnection.handle == INVALID_HANDLE_VALUE &&
+ GetLastError() != ERROR_IO_PENDING) {
+ errno = GetLastError();
+ goto error;
+ }
+
+ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
+
+ if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) {
+ errno = GetLastError();
+ goto error;
+ }
+
+ if (CreateIoCompletionPort(handle->clientConnection.handle,
+ uv_iocp_,
+ (ULONG_PTR)handle,
+ 0) == NULL) {
+ errno = GetLastError();
+ goto error;
+ }
+
+ req->error = uv_ok_;
+ req->flags |= UV_REQ_PENDING;
+ handle->connection->state = UV_PIPEINSTANCE_ACTIVE;
+ uv_insert_pending_req(req);
+ handle->reqs_pending++;
+ return 0;
+
+error:
+ close_pipe(handle, NULL, NULL);
+ req->error = uv_new_sys_error(errno);
+ uv_insert_pending_req(req);
+ handle->reqs_pending++;
+ return 0;
+}
+
+
+/* Cleans up uv_pipe_t (server or connection) and all resources associated with it */
+static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
+ uv_pipe_instance_t* connection;
+ int i;
+
+ if (handle->flags & UV_HANDLE_PIPESERVER) {
+ if (handle->flags & UV_HANDLE_CONNECTION) {
+ /*
+ * The handle is for a connection instance on the pipe server.
+ * To clean-up, we call DisconnectNamedPipe, and return the instance to pending state,
+ * which will be ready to accept another pipe connection in uv_pipe_queue_accept.
+ */
+ connection = handle->connection;
+ if (connection && connection->state != UV_PIPEINSTANCE_PENDING && connection->handle != INVALID_HANDLE_VALUE) {
+ /* Disconnect the connection intance and return it to pending state */
+ if (DisconnectNamedPipe(connection->handle)) {
+ connection->state = UV_PIPEINSTANCE_PENDING;
+ handle->connection = NULL;
+ if (status) *status = 0;
+ } else {
+ if (status) *status = -1;
+ if (err) *err = uv_new_sys_error(GetLastError());
+ }
+
+ /* Queue accept now that the instance is in pending state. */
+ if (!(handle->server->flags & UV_HANDLE_CLOSING)) {
+ uv_pipe_queue_accept(handle->server);
+ }
+ }
+ } else {
+ /*
+ * The handle is for the pipe server.
+ * To clean-up we close every connection instance that was made in uv_pipe_listen.
+ */
+
+ if (handle->name) {
+ free(handle->name);
+ handle->name = NULL;
+ }
+
+ if (handle->connections) {
+ /* Go through the list of connections, and close each one with CloseHandle. */
+ for (i = 0; i < handle->connectionCount; i++) {
+ connection = &handle->connections[i];
+ if (connection->state != UV_PIPEINSTANCE_DISCONNECTED && connection->handle != INVALID_HANDLE_VALUE) {
+ CloseHandle(connection->handle);
+ connection->state = UV_PIPEINSTANCE_DISCONNECTED;
+ connection->handle = INVALID_HANDLE_VALUE;
+ }
+ }
+
+ /* Free the connections buffer. */
+ if (handle->connections != handle->connectionsBuffer) {
+ free(handle->connections);
+ }
+
+ handle->connections = NULL;
+ }
+
+ if (status) *status = 0;
+ }
+ } else {
+ /*
+ * The handle is for a connection instance on the pipe client.
+ * To clean-up
+ */
+ connection = handle->connection;
+ if (connection && connection->handle != INVALID_HANDLE_VALUE) {
+ if (CloseHandle(connection->handle)) {
+ connection->state = UV_PIPEINSTANCE_DISCONNECTED;
+ handle->connection = NULL;
+ if (status) *status = 0;
+ } else {
+ if (status) *status = -1;
+ if (err) *err = uv_new_sys_error(GetLastError());
+ }
+ }
+ }
+
+ handle->flags |= UV_HANDLE_SHUT;
+}
diff --git a/test/benchmark-list.h b/test/benchmark-list.h
index 6040e90..5e79878 100644
--- a/test/benchmark-list.h
+++ b/test/benchmark-list.h
@@ -21,25 +21,35 @@
BENCHMARK_DECLARE (sizes)
BENCHMARK_DECLARE (ping_pongs)
-BENCHMARK_DECLARE (pump100_client)
-BENCHMARK_DECLARE (pump1_client)
+BENCHMARK_DECLARE (tcp_pump100_client)
+BENCHMARK_DECLARE (tcp_pump1_client)
+BENCHMARK_DECLARE (pipe_pump100_client)
+BENCHMARK_DECLARE (pipe_pump1_client)
BENCHMARK_DECLARE (gethostbyname)
BENCHMARK_DECLARE (getaddrinfo)
-HELPER_DECLARE (pump_server)
-HELPER_DECLARE (echo_server)
+HELPER_DECLARE (tcp_pump_server)
+HELPER_DECLARE (pipe_pump_server)
+HELPER_DECLARE (tcp4_echo_server)
+HELPER_DECLARE (pipe_echo_server)
HELPER_DECLARE (dns_server)
TASK_LIST_START
BENCHMARK_ENTRY (sizes)
BENCHMARK_ENTRY (ping_pongs)
- BENCHMARK_HELPER (ping_pongs, echo_server)
+ BENCHMARK_HELPER (ping_pongs, tcp4_echo_server)
- BENCHMARK_ENTRY (pump100_client)
- BENCHMARK_HELPER (pump100_client, pump_server)
+ BENCHMARK_ENTRY (tcp_pump100_client)
+ BENCHMARK_HELPER (tcp_pump100_client, tcp_pump_server)
- BENCHMARK_ENTRY (pump1_client)
- BENCHMARK_HELPER (pump1_client, pump_server)
+ BENCHMARK_ENTRY (tcp_pump1_client)
+ BENCHMARK_HELPER (tcp_pump1_client, tcp_pump_server)
+
+ BENCHMARK_ENTRY (pipe_pump100_client)
+ BENCHMARK_HELPER (pipe_pump100_client, pipe_pump_server)
+
+ BENCHMARK_ENTRY (pipe_pump1_client)
+ BENCHMARK_HELPER (pipe_pump1_client, pipe_pump_server)
BENCHMARK_ENTRY (gethostbyname)
BENCHMARK_HELPER (gethostbyname, dns_server)
diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c
index cd9c7d9..591dbb0 100644
--- a/test/benchmark-pump.c
+++ b/test/benchmark-pump.c
@@ -45,7 +45,9 @@ static uv_buf_t buf_alloc(uv_stream_t*, size_t size);
static void buf_free(uv_buf_t uv_buf_t);
-static uv_tcp_t server;
+static uv_tcp_t tcpServer;
+static uv_pipe_t pipeServer;
+static uv_handle_t* server;
static struct sockaddr_in listen_addr;
static struct sockaddr_in connect_addr;
@@ -68,7 +70,10 @@ static char write_buffer[WRITE_BUFFER_SIZE];
/* Make this as large as you need. */
#define MAX_WRITE_HANDLES 1000
-static uv_tcp_t write_handles[MAX_WRITE_HANDLES];
+static stream_type type;
+
+static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
+static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
static uv_timer_t timer_handle;
@@ -81,6 +86,7 @@ static double gbit(int64_t bytes, int64_t passed_ms) {
static void show_stats(uv_timer_t* handle, int status) {
int64_t diff;
+ int i;
#if PRINT_STATS
LOGF("connections: %d, write: %.1f gbit/s\n",
@@ -94,9 +100,13 @@ static void show_stats(uv_timer_t* handle, int status) {
uv_update_time();
diff = uv_now() - start_time;
- LOGF("pump%d_client: %.1f gbit/s\n", write_sockets,
+ LOGF("%s_pump%d_client: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", write_sockets,
gbit(nsent_total, diff));
+ for (i = 0; i < write_sockets; i++) {
+ uv_close(type == TCP ? (uv_handle_t*)&tcp_write_handles[i] : (uv_handle_t*)&pipe_write_handles[i], NULL);
+ }
+
exit(0);
}
@@ -112,7 +122,7 @@ static void read_show_stats() {
uv_update_time();
diff = uv_now() - start_time;
- LOGF("pump%d_server: %.1f gbit/s\n", max_read_sockets,
+ LOGF("%s_pump%d_server: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", max_read_sockets,
gbit(nrecv_total, diff));
}
@@ -133,7 +143,7 @@ void read_sockets_close_cb(uv_handle_t* handle) {
*/
if (uv_now() - start_time > 1000 && read_sockets == 0) {
read_show_stats();
- uv_close((uv_handle_t*)&server, NULL);
+ uv_close(server, NULL);
}
}
@@ -154,7 +164,7 @@ static void start_stats_collection() {
}
-static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) {
+static void read_cb(uv_stream_t* stream, ssize_t bytes, uv_buf_t buf) {
if (nrecv_total == 0) {
ASSERT(start_time == 0);
uv_update_time();
@@ -162,7 +172,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) {
}
if (bytes < 0) {
- uv_close((uv_handle_t*)tcp, read_sockets_close_cb);
+ uv_close((uv_handle_t*)stream, read_sockets_close_cb);
return;
}
@@ -187,7 +197,7 @@ static void write_cb(uv_req_t *req, int status) {
}
-static void do_write(uv_stream_t* tcp) {
+static void do_write(uv_stream_t* stream) {
uv_req_t* req;
uv_buf_t buf;
int r;
@@ -195,9 +205,9 @@ static void do_write(uv_stream_t* tcp) {
buf.base = (char*) &write_buffer;
buf.len = sizeof write_buffer;
- while (tcp->write_queue_size == 0) {
+ while (stream->write_queue_size == 0) {
req = req_alloc();
- uv_req_init(req, (uv_handle_t*)tcp, write_cb);
+ uv_req_init(req, (uv_handle_t*)stream, write_cb);
r = uv_write(req, &buf, 1);
ASSERT(r == 0);
@@ -221,7 +231,7 @@ static void connect_cb(uv_req_t* req, int status) {
/* Yay! start writing */
for (i = 0; i < write_sockets; i++) {
- do_write((uv_stream_t*)&write_handles[i]);
+ do_write(type == TCP ? (uv_stream_t*)&tcp_write_handles[i] : (uv_stream_t*)&pipe_write_handles[i]);
}
}
}
@@ -230,38 +240,55 @@ static void connect_cb(uv_req_t* req, int status) {
static void maybe_connect_some() {
uv_req_t* req;
uv_tcp_t* tcp;
+ uv_pipe_t* pipe;
int r;
while (max_connect_socket < TARGET_CONNECTIONS &&
max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
- tcp = &write_handles[max_connect_socket++];
-
- r = uv_tcp_init(tcp);
- ASSERT(r == 0);
-
- req = req_alloc();
- uv_req_init(req, (uv_handle_t*)tcp, connect_cb);
- r = uv_tcp_connect(req, connect_addr);
- ASSERT(r == 0);
+ if (type == TCP) {
+ tcp = &tcp_write_handles[max_connect_socket++];
+
+ r = uv_tcp_init(tcp);
+ ASSERT(r == 0);
+
+ req = req_alloc();
+ uv_req_init(req, (uv_handle_t*)tcp, connect_cb);
+ r = uv_tcp_connect(req, connect_addr);
+ ASSERT(r == 0);
+ } else {
+ pipe = &pipe_write_handles[max_connect_socket++];
+
+ r = uv_pipe_init(pipe);
+ ASSERT(r == 0);
+
+ req = req_alloc();
+ uv_req_init(req, (uv_handle_t*)pipe, connect_cb);
+ r = uv_pipe_connect(req, TEST_PIPENAME);
+ ASSERT(r == 0);
+ }
}
}
static void connection_cb(uv_handle_t* s, int status) {
- uv_tcp_t* tcp;
+ uv_stream_t* stream;
int r;
- ASSERT(&server == (uv_tcp_t*)s);
+ ASSERT(server == s);
ASSERT(status == 0);
- tcp = malloc(sizeof(uv_tcp_t));
-
- uv_tcp_init(tcp);
+ if (type == TCP) {
+ stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
+ uv_tcp_init((uv_tcp_t*)stream);
+ } else {
+ stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
+ uv_pipe_init((uv_pipe_t*)stream);
+ }
- r = uv_accept(s, (uv_stream_t*)tcp);
+ r = uv_accept(s, stream);
ASSERT(r == 0);
- r = uv_read_start((uv_stream_t*)tcp, buf_alloc, read_cb);
+ r = uv_read_start(stream, buf_alloc, read_cb);
ASSERT(r == 0);
read_sockets++;
@@ -317,7 +344,7 @@ typedef struct buf_list_s {
static buf_list_t* buf_freelist = NULL;
-static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) {
+static uv_buf_t buf_alloc(uv_stream_t* stream, size_t size) {
buf_list_t* buf;
buf = buf_freelist;
@@ -342,18 +369,20 @@ static void buf_free(uv_buf_t uv_buf_t) {
}
-HELPER_IMPL(pump_server) {
+HELPER_IMPL(tcp_pump_server) {
int r;
+ type = TCP;
uv_init();
listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT);
/* Server */
- r = uv_tcp_init(&server);
+ server = (uv_handle_t*)&tcpServer;
+ r = uv_tcp_init(&tcpServer);
ASSERT(r == 0);
- r = uv_tcp_bind(&server, listen_addr);
+ r = uv_tcp_bind(&tcpServer, listen_addr);
ASSERT(r == 0);
- r = uv_tcp_listen(&server, MAX_WRITE_HANDLES, connection_cb);
+ r = uv_tcp_listen(&tcpServer, MAX_WRITE_HANDLES, connection_cb);
ASSERT(r == 0);
uv_run();
@@ -362,9 +391,31 @@ HELPER_IMPL(pump_server) {
}
-void pump(int n) {
+HELPER_IMPL(pipe_pump_server) {
+ int r;
+ type = PIPE;
+
+ uv_init();
+
+ /* Server */
+ server = (uv_handle_t*)&pipeServer;
+ r = uv_pipe_init(&pipeServer);
+ ASSERT(r == 0);
+ r = uv_pipe_create(&pipeServer, TEST_PIPENAME);
+ ASSERT(r == 0);
+ r = uv_pipe_listen(&pipeServer, MAX_WRITE_HANDLES, connection_cb);
+ ASSERT(r == 0);
+
+ uv_run();
+
+ return 0;
+}
+
+
+void tcp_pump(int n) {
ASSERT(n <= MAX_WRITE_HANDLES);
TARGET_CONNECTIONS = n;
+ type = TCP;
uv_init();
@@ -377,13 +428,39 @@ void pump(int n) {
}
-BENCHMARK_IMPL(pump100_client) {
- pump(100);
+void pipe_pump(int n) {
+ ASSERT(n <= MAX_WRITE_HANDLES);
+ TARGET_CONNECTIONS = n;
+ type = PIPE;
+
+ uv_init();
+
+ /* Start making connections */
+ maybe_connect_some();
+
+ uv_run();
+}
+
+
+BENCHMARK_IMPL(tcp_pump100_client) {
+ tcp_pump(100);
+ return 0;
+}
+
+
+BENCHMARK_IMPL(tcp_pump1_client) {
+ tcp_pump(1);
+ return 0;
+}
+
+
+BENCHMARK_IMPL(pipe_pump100_client) {
+ pipe_pump(100);
return 0;
}
-BENCHMARK_IMPL(pump1_client) {
- pump(1);
+BENCHMARK_IMPL(pipe_pump1_client) {
+ pipe_pump(1);
return 0;
}
diff --git a/test/benchmark-sizes.c b/test/benchmark-sizes.c
index 830de3a..9038645 100644
--- a/test/benchmark-sizes.c
+++ b/test/benchmark-sizes.c
@@ -26,6 +26,7 @@
BENCHMARK_IMPL(sizes) {
LOGF("uv_req_t: %u bytes\n", (unsigned int) sizeof(uv_req_t));
LOGF("uv_tcp_t: %u bytes\n", (unsigned int) sizeof(uv_tcp_t));
+ LOGF("uv_pipe_t: %u bytes\n", (unsigned int) sizeof(uv_pipe_t));
LOGF("uv_prepare_t: %u bytes\n", (unsigned int) sizeof(uv_prepare_t));
LOGF("uv_check_t: %u bytes\n", (unsigned int) sizeof(uv_check_t));
LOGF("uv_idle_t: %u bytes\n", (unsigned int) sizeof(uv_idle_t));
diff --git a/test/echo-server.c b/test/echo-server.c
index 9addc54..b8c3500 100644
--- a/test/echo-server.c
+++ b/test/echo-server.c
@@ -24,19 +24,16 @@
#include <stdio.h>
#include <stdlib.h>
-
typedef struct {
uv_req_t req;
uv_buf_t buf;
} write_req_t;
-
static int server_closed;
-static uv_tcp_t server;
-
-static int server6_closed;
-static uv_tcp_t server6;
-
+static stream_type serverType;
+static uv_tcp_t tcpServer;
+static uv_pipe_t pipeServer;
+static uv_handle_t* server;
static void after_write(uv_req_t* req, int status);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
@@ -98,10 +95,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
if (!server_closed) {
for (i = 0; i < nread; i++) {
if (buf.base[i] == 'Q') {
- uv_close((uv_handle_t*)&server, on_server_close);
+ uv_close(server, on_server_close);
server_closed = 1;
- uv_close((uv_handle_t*)&server6, on_server_close);
- server6_closed = 1;
}
}
}
@@ -131,7 +126,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) {
static void on_connection(uv_handle_t* server, int status) {
- uv_tcp_t* handle;
+ uv_handle_t* handle;
int r;
if (status != 0) {
@@ -139,10 +134,17 @@ static void on_connection(uv_handle_t* server, int status) {
}
ASSERT(status == 0);
- handle = (uv_tcp_t*) malloc(sizeof *handle);
- ASSERT(handle != NULL);
+ if (serverType == TCP) {
+ handle = (uv_handle_t*) malloc(sizeof(uv_tcp_t));
+ ASSERT(handle != NULL);
- uv_tcp_init(handle);
+ uv_tcp_init((uv_tcp_t*)handle);
+ } else {
+ handle = (uv_handle_t*) malloc(sizeof(uv_pipe_t));
+ ASSERT(handle != NULL);
+
+ uv_pipe_init((uv_pipe_t*)handle);
+ }
/* associate server with stream */
handle->data = server;
@@ -156,37 +158,50 @@ static void on_connection(uv_handle_t* server, int status) {
static void on_server_close(uv_handle_t* handle) {
- ASSERT(handle == (uv_handle_t*)&server || handle == (uv_handle_t*)&server6);
+ ASSERT(handle == server);
}
-static int echo_start(int port) {
+static int tcp4_echo_start(int port) {
struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", port);
- struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port);
int r;
-
- r = uv_tcp_init(&server);
+
+ server = (uv_handle_t*)&tcpServer;
+ serverType = TCP;
+
+ r = uv_tcp_init(&tcpServer);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Socket creation error\n");
return 1;
}
- r = uv_tcp_bind(&server, addr);
+ r = uv_tcp_bind(&tcpServer, addr);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Bind error\n");
return 1;
}
- r = uv_tcp_listen(&server, 128, on_connection);
+ r = uv_tcp_listen(&tcpServer, 128, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error\n");
return 1;
}
- r = uv_tcp_init(&server6);
+ return 0;
+}
+
+
+static int tcp6_echo_start(int port) {
+ struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port);
+ int r;
+
+ server = (uv_handle_t*)&tcpServer;
+ serverType = TCP;
+
+ r = uv_tcp_init(&tcpServer);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Socket creation error\n");
@@ -194,14 +209,45 @@ static int echo_start(int port) {
}
/* IPv6 is optional as not all platforms support it */
- r = uv_tcp_bind6(&server6, addr6);
+ r = uv_tcp_bind6(&tcpServer, addr6);
if (r) {
/* show message but return OK */
fprintf(stderr, "IPv6 not supported\n");
return 0;
}
- r = uv_tcp_listen(&server6, 128, on_connection);
+ r = uv_tcp_listen(&tcpServer, 128, on_connection);
+ if (r) {
+ /* TODO: Error codes */
+ fprintf(stderr, "Listen error\n");
+ return 1;
+ }
+
+ return 0;
+}
+
+
+static int pipe_echo_start(char* pipeName) {
+ int r;
+
+ server = (uv_handle_t*)&pipeServer;
+ serverType = PIPE;
+
+ r = uv_pipe_init(&pipeServer);
+ if (r) {
+ /* TODO: Error codes */
+ fprintf(stderr, "Pipe creation error\n");
+ return 1;
+ }
+
+ r = uv_pipe_create(&pipeServer, pipeName);
+ if (r) {
+ /* TODO: Error codes */
+ fprintf(stderr, "create error\n");
+ return 1;
+ }
+
+ r = uv_pipe_listen(&pipeServer, 1, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error on IPv6\n");
@@ -212,9 +258,30 @@ static int echo_start(int port) {
}
-HELPER_IMPL(echo_server) {
+HELPER_IMPL(tcp4_echo_server) {
uv_init();
- if (echo_start(TEST_PORT))
+ if (tcp4_echo_start(TEST_PORT))
+ return 1;
+
+ uv_run();
+ return 0;
+}
+
+
+HELPER_IMPL(tcp6_echo_server) {
+ uv_init();
+ if (tcp6_echo_start(TEST_PORT))
+ return 1;
+
+ uv_run();
+ return 0;
+}
+
+
+HELPER_IMPL(pipe_echo_server) {
+ uv_init();
+
+ if (pipe_echo_start(TEST_PIPENAME))
return 1;
uv_run();
diff --git a/test/task.h b/test/task.h
index 8d9a1e8..d47c209 100644
--- a/test/task.h
+++ b/test/task.h
@@ -30,6 +30,17 @@
#define TEST_PORT 9123
#define TEST_PORT_2 9124
+#ifdef _WIN32
+# define TEST_PIPENAME "\\\\.\\pipe\\uv-test"
+#else
+# /* TODO: define unix pipe name */
+# define TEST_PIPENAME ""
+#endif
+
+typedef enum {
+ TCP = 0,
+ PIPE
+} stream_type;
/* Log to stderr. */
#define LOG(...) fprintf(stderr, "%s", __VA_ARGS__)
diff --git a/test/test-list.h b/test/test-list.h
index 190574a..0cee34f 100644
--- a/test/test-list.h
+++ b/test/test-list.h
@@ -19,8 +19,9 @@
* IN THE SOFTWARE.
*/
-TEST_DECLARE (ping_pong)
-TEST_DECLARE (ping_pong_v6)
+TEST_DECLARE (tcp_ping_pong)
+TEST_DECLARE (tcp_ping_pong_v6)
+TEST_DECLARE (pipe_ping_pong)
TEST_DECLARE (delayed_accept)
TEST_DECLARE (tcp_writealot)
TEST_DECLARE (bind_error_addrinuse)
@@ -54,20 +55,25 @@ TEST_DECLARE (getaddrinfo_concurrent)
TEST_DECLARE (gethostbyname)
TEST_DECLARE (fail_always)
TEST_DECLARE (pass_always)
-HELPER_DECLARE (echo_server)
+HELPER_DECLARE (tcp4_echo_server)
+HELPER_DECLARE (tcp6_echo_server)
+HELPER_DECLARE (pipe_echo_server)
TASK_LIST_START
- TEST_ENTRY (ping_pong)
- TEST_HELPER (ping_pong, echo_server)
+ TEST_ENTRY (tcp_ping_pong)
+ TEST_HELPER (tcp_ping_pong, tcp4_echo_server)
- TEST_ENTRY (ping_pong_v6)
- TEST_HELPER (ping_pong_v6, echo_server)
+ TEST_ENTRY (tcp_ping_pong_v6)
+ TEST_HELPER (tcp_ping_pong_v6, tcp6_echo_server)
+
+ TEST_ENTRY (pipe_ping_pong)
+ TEST_HELPER (pipe_ping_pong, pipe_echo_server)
TEST_ENTRY (delayed_accept)
TEST_ENTRY (tcp_writealot)
- TEST_HELPER (tcp_writealot, echo_server)
+ TEST_HELPER (tcp_writealot, tcp4_echo_server)
TEST_ENTRY (bind_error_addrinuse)
TEST_ENTRY (bind_error_addrnotavail_1)
@@ -86,10 +92,10 @@ TASK_LIST_START
TEST_ENTRY (connection_fail_doesnt_auto_close)
TEST_ENTRY (shutdown_eof)
- TEST_HELPER (shutdown_eof, echo_server)
+ TEST_HELPER (shutdown_eof, tcp4_echo_server)
TEST_ENTRY (callback_stack)
- TEST_HELPER (callback_stack, echo_server)
+ TEST_HELPER (callback_stack, tcp4_echo_server)
TEST_ENTRY (timer)
@@ -113,7 +119,7 @@ TASK_LIST_START
TEST_ENTRY (getaddrinfo_concurrent)
TEST_ENTRY (gethostbyname)
- TEST_HELPER (gethostbyname, echo_server)
+ TEST_HELPER (gethostbyname, tcp4_echo_server)
#if 0
/* These are for testing the test runner. */
diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c
index 34de78b..97fe794 100644
--- a/test/test-ping-pong.c
+++ b/test/test-ping-pong.c
@@ -39,7 +39,10 @@ static char PING[] = "PING\n";
typedef struct {
int pongs;
int state;
- uv_tcp_t tcp;
+ union {
+ uv_tcp_t tcp;
+ uv_pipe_t pipe;
+ };
uv_req_t connect_req;
uv_req_t read_req;
char read_buffer[BUFSIZE];
@@ -93,11 +96,11 @@ static void pinger_write_ping(pinger_t* pinger) {
}
-static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
+static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) {
unsigned int i;
pinger_t* pinger;
- pinger = (pinger_t*)tcp->data;
+ pinger = (pinger_t*)stream->data;
if (nread < 0) {
ASSERT(uv_last_error().code == UV_EOF);
@@ -142,9 +145,10 @@ static void pinger_on_connect(uv_req_t *req, int status) {
}
-static void pinger_new() {
+/* same ping-pong test, but using IPv6 connection */
+static void tcp_pinger_v6_new() {
int r;
- struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
+ struct sockaddr_in6 server_addr = uv_ip6_addr("::1", TEST_PORT);
pinger_t *pinger;
pinger = (pinger_t*)malloc(sizeof(*pinger));
@@ -161,27 +165,37 @@ static void pinger_new() {
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
(void *(*)(void *))pinger_on_connect);
- r = uv_tcp_connect(&pinger->connect_req, server_addr);
+ r = uv_tcp_connect6(&pinger->connect_req, server_addr);
ASSERT(!r);
}
-TEST_IMPL(ping_pong) {
- uv_init();
+static void tcp_pinger_new() {
+ int r;
+ struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
+ pinger_t *pinger;
- pinger_new();
- uv_run();
+ pinger = (pinger_t*)malloc(sizeof(*pinger));
+ pinger->state = 0;
+ pinger->pongs = 0;
- ASSERT(completed_pingers == 1);
+ /* Try to connec to the server and do NUM_PINGS ping-pongs. */
+ r = uv_tcp_init(&pinger->tcp);
+ pinger->tcp.data = pinger;
+ ASSERT(!r);
- return 0;
+ /* We are never doing multiple reads/connects at a time anyway. */
+ /* so these handles can be pre-initialized. */
+ uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
+ pinger_on_connect);
+
+ r = uv_tcp_connect(&pinger->connect_req, server_addr);
+ ASSERT(!r);
}
-/* same ping-pong test, but using IPv6 connection */
-static void pinger_v6_new() {
+static void pipe_pinger_new() {
int r;
- struct sockaddr_in6 server_addr = uv_ip6_addr("::1", TEST_PORT);
pinger_t *pinger;
pinger = (pinger_t*)malloc(sizeof(*pinger));
@@ -189,24 +203,48 @@ static void pinger_v6_new() {
pinger->pongs = 0;
/* Try to connec to the server and do NUM_PINGS ping-pongs. */
- r = uv_tcp_init(&pinger->tcp);
- pinger->tcp.data = pinger;
+ r = uv_pipe_init(&pinger->pipe);
+ pinger->pipe.data = pinger;
ASSERT(!r);
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
- uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
+ uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe),
(void *(*)(void *))pinger_on_connect);
- r = uv_tcp_connect6(&pinger->connect_req, server_addr);
+ r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME);
ASSERT(!r);
}
-TEST_IMPL(ping_pong_v6) {
+TEST_IMPL(tcp_ping_pong) {
+ uv_init();
+
+ tcp_pinger_new();
+ uv_run();
+
+ ASSERT(completed_pingers == 1);
+
+ return 0;
+}
+
+
+TEST_IMPL(tcp_ping_pong_v6) {
+ uv_init();
+
+ tcp_pinger_v6_new();
+ uv_run();
+
+ ASSERT(completed_pingers == 1);
+
+ return 0;
+}
+
+
+TEST_IMPL(pipe_ping_pong) {
uv_init();
- pinger_v6_new();
+ pipe_pinger_new();
uv_run();
ASSERT(completed_pingers == 1);
--
1.7.4.msysgit.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment