Skip to content

Instantly share code, notes, and snippets.

@ichizok
Last active October 8, 2016 18:41
Show Gist options
  • Save ichizok/6e0c00daf387b32bebcc2972f0cca137 to your computer and use it in GitHub Desktop.
Save ichizok/6e0c00daf387b32bebcc2972f0cca137 to your computer and use it in GitHub Desktop.
diff --git a/src/channel.c b/src/channel.c
index d4ec60b..9aa45dd 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -318,7 +318,7 @@ add_channel(void)
channel->ch_id = next_ch_id++;
ch_log(channel, "Created channel");
- for (part = PART_SOCK; part <= PART_IN; ++part)
+ for (part = PART_SOCK; part < PART_NUM; ++part)
{
channel->ch_part[part].ch_fd = INVALID_FD;
#ifdef FEAT_GUI_X11
@@ -421,9 +421,7 @@ channel_free(channel_T *channel)
if (!in_free_unref_items)
{
if (safe_to_invoke_callback == 0)
- {
channel->ch_to_be_freed = TRUE;
- }
else
{
channel_free_contents(channel);
@@ -928,6 +926,7 @@ channel_open(
channel->ch_nb_close_cb = nb_close_cb;
channel->ch_hostname = (char *)vim_strsave((char_u *)hostname);
channel->ch_port = port_in;
+ channel->ch_to_be_closed |= (1 << PART_SOCK);
#ifdef FEAT_GUI
channel_gui_register_one(channel, PART_SOCK);
@@ -998,12 +997,19 @@ theend:
}
static void
-may_close_part(sock_T *fd)
+ch_close_part(channel_T *channel, int part)
{
+ sock_T *fd = &channel->ch_part[part].ch_fd;
+
if (*fd != INVALID_FD)
{
- fd_close(*fd);
+ if (part == PART_SOCK)
+ sock_close(*fd);
+ else
+ fd_close(*fd);
*fd = INVALID_FD;
+
+ channel->ch_to_be_closed &= ~(1 << part);
}
}
@@ -1012,7 +1018,7 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err)
{
if (in != INVALID_FD)
{
- may_close_part(&channel->CH_IN_FD);
+ ch_close_part(channel, PART_IN);
channel->CH_IN_FD = in;
}
if (out != INVALID_FD)
@@ -1020,8 +1026,9 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err)
# if defined(FEAT_GUI)
channel_gui_unregister_one(channel, PART_OUT);
# endif
- may_close_part(&channel->CH_OUT_FD);
+ ch_close_part(channel, PART_OUT);
channel->CH_OUT_FD = out;
+ channel->ch_to_be_closed |= (1 << PART_OUT);
# if defined(FEAT_GUI)
channel_gui_register_one(channel, PART_OUT);
# endif
@@ -1031,8 +1038,9 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err)
# if defined(FEAT_GUI)
channel_gui_unregister_one(channel, PART_ERR);
# endif
- may_close_part(&channel->CH_ERR_FD);
+ ch_close_part(channel, PART_ERR);
channel->CH_ERR_FD = err;
+ channel->ch_to_be_closed |= (1 << PART_ERR);
# if defined(FEAT_GUI)
channel_gui_register_one(channel, PART_ERR);
# endif
@@ -1154,7 +1162,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
int part;
if (opt->jo_set & JO_MODE)
- for (part = PART_SOCK; part <= PART_IN; ++part)
+ for (part = PART_SOCK; part < PART_NUM; ++part)
channel->ch_part[part].ch_mode = opt->jo_mode;
if (opt->jo_set & JO_IN_MODE)
channel->ch_part[PART_IN].ch_mode = opt->jo_in_mode;
@@ -1164,7 +1172,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
channel->ch_part[PART_ERR].ch_mode = opt->jo_err_mode;
if (opt->jo_set & JO_TIMEOUT)
- for (part = PART_SOCK; part <= PART_IN; ++part)
+ for (part = PART_SOCK; part < PART_NUM; ++part)
channel->ch_part[part].ch_timeout = opt->jo_timeout;
if (opt->jo_set & JO_OUT_TIMEOUT)
channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout;
@@ -1448,7 +1456,7 @@ channel_write_in(channel_T *channel)
ch_log(channel, "Finished writing all lines to channel");
/* Close the pipe/socket, so that the other side gets EOF. */
- may_close_part(&channel->CH_IN_FD);
+ ch_close_part(channel, PART_IN);
}
else
ch_logn(channel, "Still %d more lines to write",
@@ -1465,7 +1473,7 @@ channel_buffer_free(buf_T *buf)
int part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
- for (part = PART_SOCK; part <= PART_IN; ++part)
+ for (part = PART_SOCK; part < PART_NUM; ++part)
{
chanpart_T *ch_part = &channel->ch_part[part];
@@ -2640,7 +2648,7 @@ channel_status(channel_T *channel, int req_part)
{
if (channel_is_open(channel))
return "open";
- for (part = PART_SOCK; part <= PART_ERR; ++part)
+ for (part = PART_SOCK; part < PART_IN; ++part)
if (channel_has_readahead(channel, part))
{
has_readahead = TRUE;
@@ -2736,14 +2744,10 @@ channel_close(channel_T *channel, int invoke_close_cb)
channel_gui_unregister(channel);
#endif
- if (channel->CH_SOCK_FD != INVALID_FD)
- {
- sock_close(channel->CH_SOCK_FD);
- channel->CH_SOCK_FD = INVALID_FD;
- }
- may_close_part(&channel->CH_IN_FD);
- may_close_part(&channel->CH_OUT_FD);
- may_close_part(&channel->CH_ERR_FD);
+ ch_close_part(channel, PART_SOCK);
+ ch_close_part(channel, PART_IN);
+ ch_close_part(channel, PART_OUT);
+ ch_close_part(channel, PART_ERR);
if (invoke_close_cb && channel->ch_close_cb != NULL)
{
@@ -2757,7 +2761,7 @@ channel_close(channel_T *channel, int invoke_close_cb)
* the channel being freed halfway. */
++channel->ch_refcount;
ch_log(channel, "Invoking callbacks before closing");
- for (part = PART_SOCK; part <= PART_ERR; ++part)
+ for (part = PART_SOCK; part < PART_IN; ++part)
while (may_invoke_callback(channel, part))
;
@@ -2789,7 +2793,7 @@ channel_close(channel_T *channel, int invoke_close_cb)
}
/* any remaining messages are useless now */
- for (part = PART_SOCK; part <= PART_ERR; ++part)
+ for (part = PART_SOCK; part < PART_IN; ++part)
drop_messages(channel, part);
}
@@ -2802,7 +2806,7 @@ channel_close(channel_T *channel, int invoke_close_cb)
void
channel_close_in(channel_T *channel)
{
- may_close_part(&channel->CH_IN_FD);
+ ch_close_part(channel, PART_IN);
}
/*
@@ -3043,11 +3047,18 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
}
static void
-channel_close_on_error(channel_T *channel, char *func)
+ch_close_part_on_error(channel_T *channel, int part, int is_err, char *func)
{
- /* Do not call emsg(), most likely the other end just exited. */
- ch_errors(channel, "%s(): Cannot read from channel, will close it soon",
- func);
+ char msgbuf[80];
+
+ sprintf(msgbuf, "%%s(): Read %s from ch_part[%d], closing",
+ (is_err ? "error" : "EOF"), part);
+
+ if (is_err)
+ /* Do not call emsg(), most likely the other end just exited. */
+ ch_errors(channel, msgbuf, func);
+ else
+ ch_logs(channel, msgbuf, func);
/* Queue a "DETACH" netbeans message in the command queue in order to
* terminate the netbeans session later. Do not end the session here
@@ -3061,24 +3072,21 @@ channel_close_on_error(channel_T *channel, char *func)
* Only send "DETACH" for a netbeans channel.
*/
if (channel->ch_nb_close_cb != NULL)
- channel_save(channel, PART_OUT, (char_u *)DETACH_MSG_RAW,
- (int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT ");
+ channel_save(channel, PART_SOCK, (char_u *)DETACH_MSG_RAW,
+ (int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT ");
- /* When reading from stdout is not possible, assume the other side has
- * died. Don't close the channel right away, it may be the wrong moment
- * to invoke callbacks. */
- channel->ch_to_be_closed = TRUE;
+ ch_close_part(channel, part);
#ifdef FEAT_GUI
/* Stop listening to GUI events right away. */
- channel_gui_unregister(channel);
+ channel_gui_unregister_one(channel, part);
#endif
}
static void
channel_close_now(channel_T *channel)
{
- ch_log(channel, "Closing channel because of previous read error");
+ ch_log(channel, "Closing channel because all readable fds are closed");
channel_close(channel, TRUE);
if (channel->ch_nb_close_cb != NULL)
(*channel->ch_nb_close_cb)();
@@ -3098,10 +3106,6 @@ channel_read(channel_T *channel, int part, char *func)
sock_T fd;
int use_socket = FALSE;
- /* If we detected a read error don't try reading again. */
- if (channel->ch_to_be_closed)
- return;
-
fd = channel->ch_part[part].ch_fd;
if (fd == INVALID_FD)
{
@@ -3141,7 +3145,7 @@ channel_read(channel_T *channel, int part, char *func)
/* Reading a disconnection (readlen == 0), or an error. */
if (readlen <= 0)
- channel_close_on_error(channel, func);
+ ch_close_part_on_error(channel, part, (len < 0), func);
#if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
/* signal the main loop that there is something to read */
@@ -3416,12 +3420,8 @@ channel_handle_events(void)
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
{
- /* If we detected a read error don't try reading again. */
- if (channel->ch_to_be_closed)
- continue;
-
/* check the socket and pipes */
- for (part = PART_SOCK; part <= PART_ERR; ++part)
+ for (part = PART_SOCK; part < PART_IN; ++part)
{
fd = channel->ch_part[part].ch_fd;
if (fd != INVALID_FD)
@@ -3431,7 +3431,8 @@ channel_handle_events(void)
if (r == CW_READY)
channel_read(channel, part, "channel_handle_events");
else if (r == CW_ERROR)
- channel_close_on_error(channel, "channel_handle_events()");
+ ch_close_part_on_error(channel, part, TRUE,
+ "channel_handle_events");
}
}
}
@@ -3816,9 +3817,9 @@ channel_parse_messages(void)
}
while (channel != NULL)
{
- if (channel->ch_to_be_closed)
+ if (channel->ch_to_be_closed == 0)
{
- channel->ch_to_be_closed = FALSE;
+ channel->ch_to_be_closed = (1 << PART_NUM);
channel_close_now(channel);
/* channel may have been freed, start over */
channel = first_channel;
@@ -3840,7 +3841,7 @@ channel_parse_messages(void)
continue;
}
if (channel->ch_part[part].ch_fd != INVALID_FD
- || channel_has_readahead(channel, part))
+ || channel_has_readahead(channel, part))
{
/* Increase the refcount, in case the handler causes the channel
* to be unreferenced or closed. */
@@ -4679,7 +4680,7 @@ job_start(typval_T *argvars)
goto theend;
/* Check that when io is "file" that there is a file name. */
- for (part = PART_OUT; part <= PART_IN; ++part)
+ for (part = PART_OUT; part < PART_NUM; ++part)
if ((opt.jo_set & (JO_OUT_IO << (part - PART_OUT)))
&& opt.jo_io[part] == JIO_FILE
&& (!(opt.jo_set & (JO_OUT_NAME << (part - PART_OUT)))
diff --git a/src/eval.c b/src/eval.c
index 3b5abe9..037be9c 100644
--- a/src/eval.c
+++ b/src/eval.c
@@ -5630,7 +5630,7 @@ set_ref_in_item(
if (ch != NULL && ch->ch_copyID != copyID)
{
ch->ch_copyID = copyID;
- for (part = PART_SOCK; part <= PART_IN; ++part)
+ for (part = PART_SOCK; part < PART_NUM; ++part)
{
for (jq = ch->ch_part[part].ch_json_head.jq_next; jq != NULL;
jq = jq->jq_next)
diff --git a/src/structs.h b/src/structs.h
index 2a4284a..b0b0c08 100644
--- a/src/structs.h
+++ b/src/structs.h
@@ -1499,19 +1499,21 @@ typedef enum {
/* Ordering matters, it is used in for loops: IN is last, only SOCK/OUT/ERR
* are polled. */
-#define PART_SOCK 0
+typedef enum {
+ PART_SOCK = 0,
#define CH_SOCK_FD ch_part[PART_SOCK].ch_fd
-
#ifdef FEAT_JOB_CHANNEL
-# define INVALID_FD (-1)
-
-# define PART_OUT 1
-# define PART_ERR 2
-# define PART_IN 3
+#define INVALID_FD (-1)
+ PART_OUT,
# define CH_OUT_FD ch_part[PART_OUT].ch_fd
+ PART_ERR,
# define CH_ERR_FD ch_part[PART_ERR].ch_fd
+ PART_IN,
# define CH_IN_FD ch_part[PART_IN].ch_fd
#endif
+ PART_NUM
+} ch_part_T;
+
/* The per-fd info for a channel. */
typedef struct {
@@ -1566,14 +1568,14 @@ struct channel_S {
int ch_id; /* ID of the channel */
int ch_last_msg_id; /* ID of the last message */
- chanpart_T ch_part[4]; /* info for socket, out, err and in */
+ chanpart_T ch_part[PART_NUM]; /* info for socket, out, err and in */
char *ch_hostname; /* only for socket, allocated */
int ch_port; /* only for socket */
- int ch_to_be_closed; /* When TRUE reading or writing failed and
- * the channel must be closed when it's safe
- * to invoke callbacks. */
+ int ch_to_be_closed; /* bitset of readable fds to be closed.
+ * When all readable fds have been closed,
+ * set to (1 << PART_NUM). */
int ch_to_be_freed; /* When TRUE channel must be freed when it's
* safe to invoke callbacks. */
int ch_error; /* When TRUE an error was reported. Avoids
diff --git a/src/testdir/test_channel.vim b/src/testdir/test_channel.vim
index 0756dd5..fbcd496 100644
--- a/src/testdir/test_channel.vim
+++ b/src/testdir/test_channel.vim
@@ -1505,6 +1505,23 @@ func Test_read_nonl_line()
call assert_equal(3, g:linecount)
endfunc
+func Test_read_from_terminated_job()
+ if !has('job')
+ return
+ endif
+
+ let g:linecount = 0
+ if has('win32')
+ " workaround: 'shellescape' does improper escaping double quotes
+ let arg = 'import os,sys;os.close(1);sys.stderr.write(\"test\n\")'
+ else
+ let arg = 'import os,sys;os.close(1);sys.stderr.write("test\n")'
+ endif
+ call job_start([s:python, '-c', arg], {'callback': 'MyLineCountCb'})
+ call WaitFor('1 <= g:linecount')
+ call assert_equal(1, g:linecount)
+endfunc
+
function Ch_test_close_lambda(port)
let handle = ch_open('localhost:' . a:port, s:chopt)
if ch_status(handle) == "fail"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment