Skip to content

Instantly share code, notes, and snippets.

@yunong
Created March 28, 2013 17:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yunong/efe869a0345867d54adf to your computer and use it in GitHub Desktop.
Save yunong/efe869a0345867d54adf to your computer and use it in GitHub Desktop.
zk 3.4.3 client patch
diff --git src/c/include/recordio.h src/c/include/recordio.h
index 4e1b78e..71a0c69 100644
--- src/c/include/recordio.h
+++ src/c/include/recordio.h
@@ -73,7 +73,9 @@ void close_buffer_iarchive(struct iarchive **ia);
char *get_buffer(struct oarchive *);
int get_buffer_len(struct oarchive *);
+#ifndef __sun
int64_t htonll(int64_t v);
+#endif
#ifdef __cplusplus
}
diff --git src/c/include/zookeeper.h src/c/include/zookeeper.h
index 7d1066a..c419c98 100644
--- src/c/include/zookeeper.h
+++ src/c/include/zookeeper.h
@@ -559,6 +559,16 @@ ZOOAPI int zookeeper_process(zhandle_t *zh, int events);
#endif
/**
+ * \brief Closes the current zookeeper fd.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \return a result code.
+ * ZOK - success
+ * -1 - in case of failure
+ */
+ZOOAPI int zookeeper_close_fd(zhandle_t *zh);
+
+/**
* \brief signature of a completion function for a call that returns void.
*
* This method will be invoked at the end of a asynchronous call and also as
diff --git src/c/src/recordio.c src/c/src/recordio.c
index cf8a1ac..a04325c 100644
--- src/c/src/recordio.c
+++ src/c/src/recordio.c
@@ -80,6 +80,7 @@ int oa_serialize_int(struct oarchive *oa, const char *tag, const int32_t *d)
priv->off+=sizeof(i);
return 0;
}
+#ifndef __sun
int64_t htonll(int64_t v)
{
int i = 0;
@@ -95,7 +96,7 @@ int64_t htonll(int64_t v)
return v;
}
-
+#endif
int oa_serialize_long(struct oarchive *oa, const char *tag, const int64_t *d)
{
const int64_t i = htonll(*d);
diff --git src/c/src/zk_adaptor.h src/c/src/zk_adaptor.h
index 6aed38f..4fd6b5a 100644
--- src/c/src/zk_adaptor.h
+++ src/c/src/zk_adaptor.h
@@ -194,7 +194,9 @@ struct _zhandle {
struct timeval last_send; /* The time that the last message was sent */
struct timeval last_ping; /* The time that the last PING was sent */
struct timeval next_deadline; /* The time of the next deadline */
- int recv_timeout; /* The maximum amount of time that can go by without
+ struct timeval last_connect; /* The time when the connection was still up */
+ struct timeval init; /* The time when this handle was initialized */
+ int recv_timeout; /* The maximum amount of time that can go by without
receiving anything from the zookeeper server */
buffer_list_t *input_buffer; /* the current buffer being read in */
buffer_head_t to_process; /* The buffers that have been read and are ready to be processed. */
diff --git src/c/src/zk_log.c src/c/src/zk_log.c
index 4dc0f21..fbdfcfb 100644
--- src/c/src/zk_log.c
+++ src/c/src/zk_log.c
@@ -132,6 +132,7 @@ void log_message(ZooLogLevel curLevel,int line,const char* funcName,
char timebuf [TIME_NOW_BUF_SIZE];
#endif
if(pid==0)pid=getpid();
+#if !defined(__sun) || defined(_amd64)
#ifndef THREADED
fprintf(LOGSTREAM, "%s:%d:%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
dbgLevelStr[curLevel],funcName,line,message);
@@ -146,6 +147,17 @@ void log_message(ZooLogLevel curLevel,int line,const char* funcName,
dbgLevelStr[curLevel],funcName,line,message);
#endif
#endif
+#endif
+#if defined(__sun) && !defined(_amd64)
+#ifndef THREADED
+ fprintf(LOGSTREAM, "%s:%ld:%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
+ dbgLevelStr[curLevel],funcName,line,message);
+#else
+ fprintf(LOGSTREAM, "%s:%ld(0x%lx):%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
+ (unsigned long int)pthread_self(),
+ dbgLevelStr[curLevel],funcName,line,message);
+#endif
+#endif
fflush(LOGSTREAM);
}
diff --git src/c/src/zookeeper.c src/c/src/zookeeper.c
index f117b18..2161d6c 100644
--- src/c/src/zookeeper.c
+++ src/c/src/zookeeper.c
@@ -789,6 +789,7 @@ zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
if (!zh) {
return 0;
}
+ gettimeofday(&zh->init, 0);
zh->fd = -1;
zh->state = NOTCONNECTED_STATE_DEF;
zh->context = context;
@@ -1232,9 +1233,9 @@ static void handle_error(zhandle_t *zh,int rc)
cleanup_bufs(zh,1,rc);
zh->fd = -1;
zh->connect_index++;
- if (!is_unrecoverable(zh)) {
- zh->state = 0;
- }
+// if (!is_unrecoverable(zh)) {
+// zh->state = 0;
+// }
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
@@ -1551,8 +1552,33 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
gettimeofday(&now, 0);
+ if (zh->state == ZOO_CONNECTING_STATE) {
+ int session_timeout = calculate_interval(&zh->last_connect, &now);
+ LOG_DEBUG(("time since disconnect is %d", session_timeout));
+ if (session_timeout > zh->recv_timeout) {
+ /* If the previous state was CONNECTING_STATE, and we've
+ * exceeded the timeout, then that means we're not successful
+ * in re-connecting before the session timeout, so we need
+ * to inform the upstream consumer and return ZSESSIONEXPIRED, since
+ * the server would have expired us.
+ */
+ LOG_ERROR(("Exceeded timeout and disconnected from server."));
+ return api_epilog(zh, ZSESSIONEXPIRED);
+ }
+ // If we've never connected to the server before, then we want to timeout if
+ // we're still in connecting state and it's been > timeout
+ if (zh->last_connect.tv_sec == 0) {
+ int time_since_init = calculate_interval(&zh->init, &now);
+ if (time_since_init > zh->recv_timeout) {
+ LOG_ERROR(("unable to connect to zk server at all"));
+ return api_epilog(zh, ZSESSIONEXPIRED);
+ }
+ }
+ }
+
+ int time_left = 0;
if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
- int time_left = calculate_interval(&zh->next_deadline, &now);
+ time_left = calculate_interval(&zh->next_deadline, &now);
if (time_left > 10)
LOG_WARN(("Exceeded deadline by %dms", time_left));
}
@@ -1602,12 +1628,20 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
#endif
}
if (rc == -1) {
+ zh->state = ZOO_CONNECTING_STATE;
+ *interest = 3;
+ *tv = get_timeval(zh->recv_timeout/3);
+ zh->last_recv = now;
+ zh->last_send = now;
+ zh->last_ping = now;
/* we are handling the non-blocking connect according to
* the description in section 16.3 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition */
- if (errno == EWOULDBLOCK || errno == EINPROGRESS)
- zh->state = ZOO_CONNECTING_STATE;
- else
+ if (errno == EINPROGRESS) {
+ *fd = zh->fd;
+ LOG_DEBUG(("non blocking connection in progress with zh->fd %d, fd %d", zh->fd, *fd));
+ return api_epilog(zh, ZOK);
+ } else
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZCONNECTIONLOSS,"connect() call failed"));
} else {
@@ -1743,6 +1777,7 @@ static int check_events(zhandle_t *zh, int events)
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
sizeof(zh->client_id.passwd));
zh->state = ZOO_CONNECTED_STATE;
+ gettimeofday(&zh->last_connect, 0);
LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
format_endpoint_info(&zh->addrs[zh->connect_index]),
newid, zh->recv_timeout));
@@ -2287,8 +2322,18 @@ int zookeeper_process(zhandle_t *zh, int events)
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
+ gettimeofday(&zh->last_connect, 0);
return api_epilog(zh,ZOK);}
+int zookeeper_close_fd(zhandle_t *zh) {
+ if (zh != 0) {
+ close(zh->fd);
+ zh->fd = -1;
+ }
+
+ return 0;
+}
+
int zoo_state(zhandle_t *zh)
{
if(zh!=0)
diff --git zookeeper-3.4.3.tar.gz zookeeper-3.4.3.tar.gz
new file mode 100644
index 0000000..0a877b4
Binary files /dev/null and zookeeper-3.4.3.tar.gz differ
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment