Created
August 25, 2011 23:34
-
-
Save piscisaureus/1172313 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 99e99ac89110edf5614a326096f41ed1c24f8650 Mon Sep 17 00:00:00 2001 | |
From: Bert Belder <bertbelder@gmail.com> | |
Date: Fri, 26 Aug 2011 01:32:27 +0200 | |
Subject: [PATCH 1/1] win: better shutdown for pipes | |
--- | |
include/uv-win.h | 3 +- | |
src/win/pipe.c | 200 +++++++++++++++++++++++++++++++++++++++++++++++------- | |
2 files changed, 176 insertions(+), 27 deletions(-) | |
diff --git a/include/uv-win.h b/include/uv-win.h | |
index a3ad457..06966d5 100644 | |
--- a/include/uv-win.h | |
+++ b/include/uv-win.h | |
@@ -133,9 +133,10 @@ typedef struct uv_buf_t { | |
uv_pipe_accept_t* pending_accepts; | |
#define uv_pipe_connection_fields \ | |
- HANDLE handle; | |
+ uv_timer_t* eof_timer; | |
#define UV_PIPE_PRIVATE_FIELDS \ | |
+ HANDLE handle; \ | |
wchar_t* name; \ | |
union { \ | |
struct { uv_pipe_server_fields }; \ | |
diff --git a/src/win/pipe.c b/src/win/pipe.c | |
index 39110e6..35a9d49 100644 | |
--- a/src/win/pipe.c | |
+++ b/src/win/pipe.c | |
@@ -31,6 +31,20 @@ | |
/* A zero-size buffer for use by uv_pipe_read */ | |
static char uv_zero_[] = ""; | |
+/* Null uv_buf_t */ | |
+static const uv_buf_t uv_null_buf_ = { 0, NULL }; | |
+ | |
+/* The timeout that the pipe will wait for the remote end to write data */ | |
+/* when the local ends wants to shut it down. */ | |
+static const int64_t eof_timeout = 50; /* ms */ | |
+ | |
+static void eof_timer_init(uv_pipe_t* pipe); | |
+static void eof_timer_start(uv_pipe_t* pipe); | |
+static void eof_timer_stop(uv_pipe_t* pipe); | |
+static void eof_timer_cb(uv_timer_t* timer, int status); | |
+static void eof_timer_destroy(uv_pipe_t* pipe); | |
+static void eof_timer_close_cb(uv_handle_t* handle); | |
+ | |
static void uv_unique_pipe_name(char* ptr, char* name, size_t size) { | |
_snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%d", ptr, GetCurrentProcessId()); | |
@@ -42,9 +56,8 @@ int uv_pipe_init(uv_pipe_t* handle) { | |
handle->type = UV_NAMED_PIPE; | |
handle->reqs_pending = 0; | |
- handle->pending_accepts = NULL; | |
- handle->name = NULL; | |
handle->handle = INVALID_HANDLE_VALUE; | |
+ handle->name = NULL; | |
uv_counters()->pipe_init++; | |
@@ -69,6 +82,12 @@ int uv_pipe_init_with_handle(uv_pipe_t* handle, HANDLE pipeHandle) { | |
} | |
+static void uv_pipe_connection_init(uv_pipe_t* handle) { | |
+ uv_connection_init((uv_stream_t*) handle); | |
+ handle->eof_timer = NULL; | |
+} | |
+ | |
+ | |
int uv_stdio_pipe_server(uv_pipe_t* handle, DWORD access, char* name, size_t nameSize) { | |
HANDLE pipeHandle; | |
int errno; | |
@@ -112,7 +131,7 @@ int uv_stdio_pipe_server(uv_pipe_t* handle, DWORD access, char* name, size_t nam | |
goto done; | |
} | |
- uv_connection_init((uv_stream_t*)handle); | |
+ uv_pipe_connection_init(handle); | |
handle->handle = pipeHandle; | |
handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; | |
err = 0; | |
@@ -196,12 +215,10 @@ void uv_pipe_endgame(uv_pipe_t* handle) { | |
} | |
if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) { | |
- /* Short-circuit, no need to call FlushFileBuffers. */ | |
handle->flags |= UV_HANDLE_SHUT; | |
- if (req->cb) { | |
- req->cb(req, 0); | |
- } | |
- DECREASE_PENDING_REQ_COUNT(handle); | |
+ | |
+ /* Short-circuit, no need to call FlushFileBuffers. */ | |
+ uv_insert_pending_req((uv_req_t*) req); | |
return; | |
} | |
@@ -313,6 +330,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { | |
goto error; | |
} | |
+ handle->pending_accepts = NULL; | |
handle->flags |= UV_HANDLE_PIPESERVER; | |
handle->flags |= UV_HANDLE_BOUND; | |
@@ -470,7 +488,14 @@ void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { | |
} | |
} | |
- } else if (handle->handle != INVALID_HANDLE_VALUE) { | |
+ } | |
+ | |
+ if (handle->flags & UV_HANDLE_CONNECTION) { | |
+ eof_timer_destroy(handle); | |
+ } | |
+ | |
+ if ((handle->flags & UV_HANDLE_CONNECTION) | |
+ && handle->handle != INVALID_HANDLE_VALUE) { | |
CloseHandle(handle->handle); | |
handle->handle = INVALID_HANDLE_VALUE; | |
} | |
@@ -545,7 +570,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { | |
} | |
/* Initialize the client handle and copy the pipeHandle to the client */ | |
- uv_connection_init((uv_stream_t*) client); | |
+ uv_pipe_connection_init(client); | |
client->handle = req->pipeHandle; | |
/* Prepare the req to pick up a new connection */ | |
@@ -646,6 +671,9 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) { | |
return; | |
} | |
+ /* Start the eof timer if there is one */ | |
+ eof_timer_start(handle); | |
+ | |
handle->flags |= UV_HANDLE_READ_PENDING; | |
handle->reqs_pending++; | |
} | |
@@ -734,6 +762,36 @@ int uv_pipe_write(uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcn | |
} | |
+static void uv_pipe_read_eof(uv_pipe_t* handle, uv_buf_t buf) { | |
+ /* If there is an eof timer running, we don't need it any more, */ | |
+ /* so discard it. */ | |
+ eof_timer_destroy(handle); | |
+ | |
+ handle->flags |= UV_HANDLE_EOF; | |
+ uv_read_stop((uv_stream_t*) handle); | |
+ | |
+ uv_set_error(UV_EOF, 0); | |
+ handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); | |
+} | |
+ | |
+ | |
+static void uv_pipe_read_error(uv_pipe_t* handle, int error, uv_buf_t buf) { | |
+ uv_read_stop((uv_stream_t*) handle); | |
+ | |
+ uv_set_sys_error(GetLastError()); | |
+ handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+} | |
+ | |
+ | |
+static void uv_pipe_read_error_or_eof(uv_pipe_t* handle, int error, uv_buf_t buf) { | |
+ if (error == ERROR_BROKEN_PIPE) { | |
+ uv_pipe_read_eof(handle, buf); | |
+ } else { | |
+ uv_pipe_read_error(handle, error, buf); | |
+ } | |
+} | |
+ | |
+ | |
void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { | |
DWORD bytes, avail; | |
uv_buf_t buf; | |
@@ -741,18 +799,20 @@ void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { | |
assert(handle->type == UV_NAMED_PIPE); | |
handle->flags &= ~UV_HANDLE_READ_PENDING; | |
+ eof_timer_stop(handle); | |
if (!REQ_SUCCESS(req)) { | |
/* An error occurred doing the 0-read. */ | |
if (handle->flags & UV_HANDLE_READING) { | |
- /* Stop reading and report error. */ | |
- handle->flags &= ~UV_HANDLE_READING; | |
- LOOP->last_error = GET_REQ_UV_ERROR(req); | |
- buf.base = 0; | |
- buf.len = 0; | |
- handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ uv_pipe_read_error_or_eof(handle, GET_REQ_ERROR(req), uv_null_buf_); | |
} | |
} else { | |
+ /* Did the pipe get closed because of shutdown? */ | |
+ if (handle->handle == INVALID_HANDLE_VALUE && | |
+ (handle->flags & UV_HANDLE_SHUT)) { | |
+ uv_pipe_read_eof(handle, uv_null_buf_); | |
+ } | |
+ | |
/* Do non-blocking reads until the buffer is empty */ | |
while (handle->flags & UV_HANDLE_READING) { | |
if (!PeekNamedPipe(handle->handle, | |
@@ -761,10 +821,7 @@ void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { | |
NULL, | |
&avail, | |
NULL)) { | |
- uv_set_sys_error(GetLastError()); | |
- buf.base = 0; | |
- buf.len = 0; | |
- handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ uv_pipe_read_error_or_eof(handle, GetLastError(), uv_null_buf_); | |
break; | |
} | |
@@ -788,9 +845,7 @@ void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { | |
break; | |
} | |
} else { | |
- /* Ouch! serious error. */ | |
- uv_set_sys_error(GetLastError()); | |
- handle->read_cb((uv_stream_t*)handle, -1, buf); | |
+ uv_pipe_read_error_or_eof(handle, GetLastError(), uv_null_buf_); | |
break; | |
} | |
} | |
@@ -863,7 +918,7 @@ void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_connect_t* req) { | |
if (req->cb) { | |
if (REQ_SUCCESS(req)) { | |
- uv_connection_init((uv_stream_t*)handle); | |
+ uv_pipe_connection_init(handle); | |
((uv_connect_cb)req->cb)(req, 0); | |
} else { | |
LOOP->last_error = GET_REQ_UV_ERROR(req); | |
@@ -878,8 +933,17 @@ void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_connect_t* req) { | |
void uv_process_pipe_shutdown_req(uv_pipe_t* handle, uv_shutdown_t* req) { | |
assert(handle->type == UV_NAMED_PIPE); | |
- CloseHandle(handle->handle); | |
- handle->handle = INVALID_HANDLE_VALUE; | |
+ /* Initialize and optionally start the eof timer. */ | |
+ /* This makes no sense if we've already seen EOF. */ | |
+ if (!(handle->flags & UV_HANDLE_EOF)) { | |
+ eof_timer_init(handle); | |
+ | |
+ /* If reading start the timer right now. */ | |
+ /* Otherwise uv_pipe_queue_read will start it. */ | |
+ if (handle->flags & UV_HANDLE_READ_PENDING) { | |
+ eof_timer_start(handle); | |
+ } | |
+ } | |
if (req->cb) { | |
req->cb(req, 0); | |
@@ -887,3 +951,87 @@ void uv_process_pipe_shutdown_req(uv_pipe_t* handle, uv_shutdown_t* req) { | |
DECREASE_PENDING_REQ_COUNT(handle); | |
} | |
+ | |
+ | |
+static void eof_timer_init(uv_pipe_t* pipe) { | |
+ int r; | |
+ | |
+ assert(pipe->eof_timer == NULL); | |
+ assert(pipe->flags & UV_HANDLE_CONNECTION); | |
+ | |
+ pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer); | |
+ | |
+ r = uv_timer_init(pipe->eof_timer); | |
+ assert(r == 0); /* timers can't fail */ | |
+ pipe->eof_timer->data = pipe; | |
+} | |
+ | |
+ | |
+static void eof_timer_start(uv_pipe_t* pipe) { | |
+ assert(pipe->flags & UV_HANDLE_CONNECTION); | |
+ | |
+ if (pipe->eof_timer != NULL) { | |
+ uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0); | |
+ } | |
+} | |
+ | |
+ | |
+static void eof_timer_stop(uv_pipe_t* pipe) { | |
+ assert(pipe->flags & UV_HANDLE_CONNECTION); | |
+ | |
+ if (pipe->eof_timer != NULL) { | |
+ uv_timer_stop(pipe->eof_timer); | |
+ } | |
+} | |
+ | |
+ | |
+static void eof_timer_cb(uv_timer_t* timer, int status) { | |
+ uv_pipe_t* pipe = (uv_pipe_t*) timer->data; | |
+ | |
+ assert(status == 0); /* timers can't fail */ | |
+ assert(pipe->type == UV_NAMED_PIPE); | |
+ | |
+ /* This should always be true, since we start the timer only */ | |
+ /* in uv_pipe_queue_read after successfully calling WriteFile, */ | |
+ /* or in uv_process_pipe_shutdown_req if a read is pending, */ | |
+ /* and we always immediately stop the timer in */ | |
+ /* uv_process_pipe_read_req. */ | |
+ assert(pipe->flags & UV_HANDLE_READ_PENDING) ; | |
+ | |
+ /* If there are many packets coming off the iocp then the timer callback */ | |
+ /* may be called before the read request is coming off the queue. */ | |
+ /* Therefore we check here if the read request has completed but will */ | |
+ /* be processed later. */ | |
+ if ((pipe->flags & UV_HANDLE_READ_PENDING) && | |
+ HasOverlappedIoCompleted(&pipe->read_req.overlapped)) { | |
+ return; | |
+ } | |
+ | |
+ /* Force both ends off the pipe. */ | |
+ CloseHandle(pipe->handle); | |
+ pipe->handle = INVALID_HANDLE_VALUE; | |
+ | |
+ /* Stop reading, so the pending read that is going to fail will */ | |
+ /* not be reported to the user. */ | |
+ uv_read_stop((uv_stream_t*) pipe); | |
+ | |
+ /* Report the eof and update flags. This will get reported even if the */ | |
+ /* user stopped reading in the meantime. TODO: is that okay? */ | |
+ uv_pipe_read_eof(pipe, uv_null_buf_); | |
+} | |
+ | |
+ | |
+static void eof_timer_destroy(uv_pipe_t* pipe) { | |
+ assert(pipe->flags && UV_HANDLE_CONNECTION); | |
+ | |
+ if (pipe->eof_timer) { | |
+ uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb); | |
+ pipe->eof_timer = NULL; | |
+ } | |
+} | |
+ | |
+ | |
+static void eof_timer_close_cb(uv_handle_t* handle) { | |
+ assert(handle->type == UV_TIMER); | |
+ free(handle); | |
+} | |
-- | |
1.7.6.msysgit.0 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment