Skip to content

Instantly share code, notes, and snippets.

@jbreams
Created June 28, 2015 16:38
Show Gist options
  • Save jbreams/64fad28478dd63e84adc to your computer and use it in GitHub Desktop.
Save jbreams/64fad28478dd63e84adc to your computer and use it in GitHub Desktop.
diff --git a/Makefile.am b/Makefile.am
index f375437..b219601 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -329,7 +329,8 @@ test_apps = \
test_bind_src_address \
test_metadata \
test_capabilities \
- test_xpub_nodrop
+ test_xpub_nodrop \
+ test_heartbeats
test_system_SOURCES = tests/test_system.cpp
test_system_LDADD = libzmq.la
@@ -498,6 +499,9 @@ test_capabilities_LDADD = libzmq.la
test_xpub_nodrop_SOURCES = tests/test_xpub_nodrop.cpp
test_xpub_nodrop_LDADD = libzmq.la
+test_heartbeats_SOURCES = tests/test_heartbeats.cpp
+test_heartbeats_LDADD = libzmq.la
+
if !ON_MINGW
test_apps += \
test_shutdown_stress \
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 0025ef5..0b5cc87 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -246,6 +246,47 @@ Option value unit:: milliseconds
Default value:: 30000
Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented transports
+ZMQ_HEARTBEAT_IVL: Set interval between sending ZMTP heartbeats
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_HEARTBEAT_IVL' option shall set the interval between sending ZMTP heartbeats
+for the specified 'socket'. If this option is set and is greater than 0, then a 'PING'
+ZMTP command will be sent every 'ZMQ_HEARTBEAT_IVL' milliseconds.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: 0
+Applicable socket types:: all, when using connection-oriented transports
+
+ZMQ_HEARTBEAT_TIMEOUT: Set timeout for ZMTP heartbeats
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_HEARTBEAT_TIMEOUT' option shall set how long to wait before timing-out a
+connection after sending a 'PING' ZMTP command and not receiving any traffic. This
+option is only valid if 'ZMQ_HEARTBEAT_IVL' is also set, and is greater than 0. The
+connection will time out if there is no traffic received after sending the 'PING'
+command, but the received traffic does not have to be a 'PONG' command - any received
+traffic will cancel the timeout.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: 0
+Applicable socket types:: all, when using connection-oriented transports
+
+ZMQ_HEARTBEAT_TTL: Set the TTL value for ZMTP heartbeats
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_HEARTBEAT_TTL' option shall set the timeout on the remote peer for ZMTP
+heartbeats. If this option is greater than 0, the remote side shall time out the
+connection if it does not receive any more traffic within the TTL period. This option
+does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0. Internally, this
+value is rounded down to the nearest decisecond, any value less than 100 will have
+no effect.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: 0
+Applicable socket types:: all, when using connection-oriented transports
ZMQ_IDENTITY: Set socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/include/zmq.h b/include/zmq.h
index e6e9be7..00715c8 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -294,6 +294,9 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_SOCKS_PROXY 68
#define ZMQ_XPUB_NODROP 69
+#define ZMQ_HEARTBEAT_IVL 75
+#define ZMQ_HEARTBEAT_TTL 76
+#define ZMQ_HEARTBEAT_TIMEOUT 77
/* Message options */
#define ZMQ_MORE 1
diff --git a/src/curve_client.cpp b/src/curve_client.cpp
index 842e504..416227b 100644
--- a/src/curve_client.cpp
+++ b/src/curve_client.cpp
@@ -129,6 +129,8 @@ int zmq::curve_client_t::encode (msg_t *msg_)
uint8_t flags = 0;
if (msg_->flags () & msg_t::more)
flags |= 0x01;
+ if (msg_->flags () & msg_t::command)
+ flags |= 0x02;
uint8_t message_nonce [crypto_box_NONCEBYTES];
memcpy (message_nonce, "CurveZMQMESSAGEC", 16);
@@ -222,6 +224,8 @@ int zmq::curve_client_t::decode (msg_t *msg_)
const uint8_t flags = message_plaintext [crypto_box_ZEROBYTES];
if (flags & 0x01)
msg_->set_flags (msg_t::more);
+ if (flags & 0x02)
+ msg_->set_flags (msg_t::command);
memcpy (msg_->data (),
message_plaintext + crypto_box_ZEROBYTES + 1,
diff --git a/src/curve_server.cpp b/src/curve_server.cpp
index 7bdd8a9..a7d9ea7 100644
--- a/src/curve_server.cpp
+++ b/src/curve_server.cpp
@@ -141,6 +141,8 @@ int zmq::curve_server_t::encode (msg_t *msg_)
uint8_t flags = 0;
if (msg_->flags () & msg_t::more)
flags |= 0x01;
+ if (msg_->flags () & msg_t::command)
+ flags |= 0x02;
uint8_t *message_plaintext = static_cast <uint8_t *> (malloc (mlen));
alloc_assert (message_plaintext);
@@ -231,6 +233,8 @@ int zmq::curve_server_t::decode (msg_t *msg_)
const uint8_t flags = message_plaintext [crypto_box_ZEROBYTES];
if (flags & 0x01)
msg_->set_flags (msg_t::more);
+ if (flags & 0x02)
+ msg_->set_flags (msg_t::command);
memcpy (msg_->data (),
message_plaintext + crypto_box_ZEROBYTES + 1,
diff --git a/src/gssapi_mechanism_base.cpp b/src/gssapi_mechanism_base.cpp
index 355f152..bdd1836 100644
--- a/src/gssapi_mechanism_base.cpp
+++ b/src/gssapi_mechanism_base.cpp
@@ -80,6 +80,8 @@ int zmq::gssapi_mechanism_base_t::encode_message (msg_t *msg_)
uint8_t flags = 0;
if (msg_->flags () & msg_t::more)
flags |= 0x01;
+ if (msg ->flags () & msg_t::command)
+ flags |= 0x02;
uint8_t *plaintext_buffer = static_cast <uint8_t *>(malloc(msg_->size ()+1));
plaintext_buffer[0] = flags;
@@ -177,6 +179,8 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_)
const uint8_t flags = static_cast <char *> (plaintext.value)[0];
if (flags & 0x01)
msg_->set_flags (msg_t::more);
+ if (flags & 0x02)
+ msg_->set_flags (msg_t::command);
memcpy (msg_->data (), static_cast <char *> (plaintext.value)+1, plaintext.length-1);
diff --git a/src/options.cpp b/src/options.cpp
index ea4f74c..677d2c9 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -66,7 +66,10 @@ zmq::options_t::options_t () :
gss_plaintext (false),
socket_id (0),
conflate (false),
- handshake_ivl (30000)
+ handshake_ivl (30000),
+ heartbeat_ttl (0),
+ heartbeat_interval (0),
+ heartbeat_timeout (-1)
{
}
@@ -503,6 +505,29 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
+ case ZMQ_HEARTBEAT_IVL:
+ if (is_int && value >= 0) {
+ heartbeat_interval = value;
+ return 0;
+ }
+ break;
+
+ case ZMQ_HEARTBEAT_TTL:
+ // Convert this to deciseconds from milliseconds
+ value = value / 100;
+ if (is_int && value >= 0 && value <= 6553) {
+ heartbeat_ttl = (uint16_t)value;
+ return 0;
+ }
+ break;
+
+ case ZMQ_HEARTBEAT_TIMEOUT:
+ if (is_int && value >= 0) {
+ heartbeat_timeout = value;
+ return 0;
+ }
+ break;
+
default:
#if defined (ZMQ_ACT_MILITANT)
// There are valid scenarios for probing with unknown socket option
@@ -849,6 +874,28 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
+ case ZMQ_HEARTBEAT_IVL:
+ if (is_int) {
+ *value = heartbeat_interval;
+ return 0;
+ }
+ break;
+
+ case ZMQ_HEARTBEAT_TTL:
+ if (is_int) {
+ // Convert the internal deciseconds value to milliseconds
+ *value = heartbeat_ttl * 100;
+ return 0;
+ }
+ break;
+
+ case ZMQ_HEARTBEAT_TIMEOUT:
+ if (is_int) {
+ *value = heartbeat_timeout;
+ return 0;
+ }
+ break;
+
default:
#if defined (ZMQ_ACT_MILITANT)
malformed = false;
diff --git a/src/options.hpp b/src/options.hpp
index b4a019c..9a68340 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -189,6 +189,15 @@ namespace zmq
// close socket. Default is 30 secs. 0 means no handshake timeout.
int handshake_ivl;
+ // If remote peer receives a PING message and doesn't receive another
+ // message within the ttl value, it should close the connection
+ // (measured in tenths of a second)
+ uint16_t heartbeat_ttl;
+ // Time in milliseconds between sending heartbeat PING messages.
+ int heartbeat_interval;
+ // Time in milliseconds to wait for a PING response before disconnecting
+ int heartbeat_timeout;
+
};
}
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 86bfd8f..30b6706 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -135,6 +135,8 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
int zmq::session_base_t::push_msg (msg_t *msg_)
{
+ if(msg_->flags() & msg_t::command)
+ return 0;
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 87186cc..adae809 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -96,6 +96,10 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
input_stopped (false),
output_stopped (false),
has_handshake_timer (false),
+ has_ttl_timer (false),
+ has_timeout_timer (false),
+ has_heartbeat_timer (false),
+ heartbeat_timeout (0),
socket (NULL)
{
int rc = tx_msg.init ();
@@ -142,6 +146,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
errno_assert (rc == 0);
#endif
+ if(options.heartbeat_interval > 0) {
+ heartbeat_timeout = options.heartbeat_timeout;
+ if(heartbeat_timeout == -1)
+ heartbeat_timeout = options.heartbeat_interval;
+ }
}
zmq::stream_engine_t::~stream_engine_t ()
@@ -251,6 +260,20 @@ void zmq::stream_engine_t::unplug ()
has_handshake_timer = false;
}
+ if (has_ttl_timer) {
+ cancel_timer (heartbeat_ttl_timer_id);
+ has_ttl_timer = false;
+ }
+
+ if (has_timeout_timer) {
+ cancel_timer (heartbeat_timeout_timer_id);
+ has_timeout_timer = false;
+ }
+
+ if (has_heartbeat_timer) {
+ cancel_timer (heartbeat_ivl_timer_id);
+ has_heartbeat_timer = false;
+ }
// Cancel all fd subscriptions.
if (!io_error)
rm_fd (handle);
@@ -684,6 +707,11 @@ bool zmq::stream_engine_t::handshake ()
}
next_msg = &stream_engine_t::next_handshake_command;
process_msg = &stream_engine_t::process_handshake_command;
+
+ if(options.heartbeat_interval > 0) {
+ add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
+ has_heartbeat_timer = true;
+ }
}
// Start polling for output if necessary.
@@ -887,6 +915,23 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
if (mechanism->decode (msg_) == -1)
return -1;
+
+ if(has_timeout_timer) {
+ has_timeout_timer = false;
+ cancel_timer(heartbeat_timeout_timer_id);
+ }
+
+ if(has_ttl_timer) {
+ has_ttl_timer = false;
+ cancel_timer(heartbeat_ttl_timer_id);
+ }
+
+ if(msg_->flags() & msg_t::command) {
+ uint8_t cmd_id = *((uint8_t*)msg_->data());
+ if(cmd_id == 4)
+ process_heartbeat_message(msg_);
+ }
+
if (metadata)
msg_->set_metadata (metadata);
if (session->push_msg (msg_) == -1) {
@@ -952,9 +997,86 @@ void zmq::stream_engine_t::set_handshake_timer ()
void zmq::stream_engine_t::timer_event (int id_)
{
- zmq_assert (id_ == handshake_timer_id);
- has_handshake_timer = false;
+ if(id_ == handshake_timer_id) {
+ has_handshake_timer = false;
+ // handshake timer expired before handshake completed, so engine fail
+ error (timeout_error);
+ }
+ else if(id_ == heartbeat_ivl_timer_id) {
+ next_msg = &stream_engine_t::produce_ping_message;
+ out_event();
+ add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
+ }
+ else if(id_ == heartbeat_ttl_timer_id) {
+ has_ttl_timer = false;
+ error(timeout_error);
+ }
+ else if(id_ == heartbeat_timeout_timer_id) {
+ has_timeout_timer = false;
+ error(timeout_error);
+ }
+ else
+ // There are no other valid timer ids!
+ assert(false);
+}
- // handshake timer expired before handshake completed, so engine fails
- error (timeout_error);
+int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
+{
+ int rc = 0;
+ zmq_assert (mechanism != NULL);
+
+ // 16-bit TTL + \4PING == 7
+ msg_->init_size(7);
+ msg_->set_flags(msg_t::command);
+ // Copy in the command message
+ memcpy(msg_->data(), "\4PING", 5);
+
+ uint16_t ttl_val = htons(options.heartbeat_ttl);
+ memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));
+
+ rc = mechanism->encode (msg_);
+ next_msg = &stream_engine_t::pull_and_encode;
+ if(!has_timeout_timer && heartbeat_timeout > 0) {
+ add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
+ has_timeout_timer = true;
+ }
+ return rc;
+}
+
+int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
+{
+ int rc = 0;
+ zmq_assert (mechanism != NULL);
+
+ msg_->init_size(5);
+ msg_->set_flags(msg_t::command);
+
+ memcpy(msg_->data(), "\4PONG", 5);
+
+ rc = mechanism->encode (msg_);
+ next_msg = &stream_engine_t::pull_and_encode;
+ return rc;
+}
+
+int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
+{
+ if(memcmp(msg_->data(), "\4PING", 5) == 0) {
+ uint16_t remote_heartbeat_ttl;
+ // Get the remote heartbeat TTL to setup the timer
+ memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
+ remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
+ // The remote heartbeat is in 10ths of a second
+ // so we multiply it by 100 to get the timer interval in ms.
+ remote_heartbeat_ttl *= 100;
+
+ if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
+ add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
+ has_ttl_timer = true;
+ }
+
+ next_msg = &stream_engine_t::produce_pong_message;
+ out_event();
+ }
+
+ return 0;
}
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index d42c692..4978714 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -125,6 +125,10 @@ namespace zmq
void set_handshake_timer();
+ int produce_ping_message(msg_t * msg_);
+ int process_heartbeat_message(msg_t * msg_);
+ int produce_pong_message(msg_t * msg_);
+
// Underlying socket.
fd_t s;
@@ -204,6 +208,17 @@ namespace zmq
// True is linger timer is running.
bool has_handshake_timer;
+ // Heartbeat stuff
+ enum {
+ heartbeat_ivl_timer_id = 0x80,
+ heartbeat_timeout_timer_id = 0x81,
+ heartbeat_ttl_timer_id = 0x82
+ };
+ bool has_ttl_timer;
+ bool has_timeout_timer;
+ bool has_heartbeat_timer;
+ int heartbeat_timeout;
+
// Socket
zmq::socket_base_t *socket;
diff --git a/tests/test_heartbeats.cpp b/tests/test_heartbeats.cpp
new file mode 100644
index 0000000..c5aa060
--- /dev/null
+++ b/tests/test_heartbeats.cpp
@@ -0,0 +1,315 @@
+/*
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "testutil.hpp"
+#if defined (ZMQ_HAVE_WINDOWS)
+# include <winsock2.h>
+# include <ws2tcpip.h>
+# include <stdexcept>
+# define close closesocket
+#else
+# include <sys/socket.h>
+# include <netinet/in.h>
+# include <arpa/inet.h>
+# include <unistd.h>
+#endif
+
+// Read one event off the monitor socket; return value and address
+// by reference, if not null, and event number by value. Returns -1
+// in case of error.
+
+static int
+get_monitor_event (void *monitor)
+{
+ for(int i = 0; i < 2; i++) {
+ // First frame in message contains event number and value
+ zmq_msg_t msg;
+ zmq_msg_init (&msg);
+ if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
+ msleep(150);
+ continue; // Interruped, presumably
+ }
+ assert (zmq_msg_more (&msg));
+
+ uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
+ uint16_t event = *(uint16_t *) (data);
+
+ // Second frame in message contains event address
+ zmq_msg_init (&msg);
+ if (zmq_msg_recv (&msg, monitor, 0) == -1) {
+ return -1; // Interruped, presumably
+ }
+ assert (!zmq_msg_more (&msg));
+
+ return event;
+ }
+ return -1;
+}
+
+static void
+mock_handshake (int fd) {
+ const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 };
+ char buffer[128];
+ memset(buffer, 0, sizeof(buffer));
+ memcpy(buffer, zmtp_greeting, sizeof(zmtp_greeting));
+
+ int rc = send(fd, buffer, 64, 0);
+ assert(rc == 64);
+
+ rc = recv(fd, buffer, 64, 0);
+ assert(rc == 64);
+
+ const uint8_t zmtp_ready[43] = {
+ 4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e',
+ 0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y',
+ 0, 0, 0, 0
+ };
+
+ memset(buffer, 0, sizeof(buffer));
+ memcpy(buffer, zmtp_ready, 43);
+ rc = send(fd, buffer, 43, 0);
+ assert(rc == 43);
+
+ rc = recv(fd, buffer, 43, 0);
+ assert(rc == 43);
+}
+
+static void
+setup_curve(void * socket, int is_server) {
+ const char *secret_key;
+ const char *public_key;
+ const char *server_key;
+
+ if(is_server) {
+ secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6";
+ public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
+ server_key = NULL;
+ }
+ else {
+ secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs";
+ public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
+ server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
+ }
+
+ zmq_setsockopt(socket, ZMQ_CURVE_SECRETKEY, secret_key, strlen(secret_key));
+ zmq_setsockopt(socket, ZMQ_CURVE_PUBLICKEY, public_key, strlen(public_key));
+ if(is_server)
+ zmq_setsockopt(socket, ZMQ_CURVE_SERVER, &is_server, sizeof(is_server));
+ else
+ zmq_setsockopt(socket, ZMQ_CURVE_SERVERKEY, server_key, strlen(server_key));
+}
+
+static void
+prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_out, void ** mon_out)
+{
+ int rc;
+ // We'll be using this socket in raw mode
+ void *server = zmq_socket (ctx, ZMQ_ROUTER);
+ assert (server);
+
+ int value = 0;
+ rc = zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value));
+ assert (rc == 0);
+
+ if(set_heartbeats) {
+ value = 50;
+ rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
+ assert (rc == 0);
+ }
+
+ if(is_curve)
+ setup_curve(server, 1);
+
+ rc = zmq_bind (server, "tcp://127.0.0.1:5556");
+ assert (rc == 0);
+
+ // Create and connect a socket for collecting monitor events on dealer
+ void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
+ assert (server_mon);
+
+ rc = zmq_socket_monitor (server, "inproc://monitor-dealer",
+ ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED);
+ assert (rc == 0);
+
+ // Connect to the inproc endpoint so we'll get events
+ rc = zmq_connect (server_mon, "inproc://monitor-dealer");
+ assert (rc == 0);
+
+ *server_out = server;
+ *mon_out = server_mon;
+}
+
+// This checks for a broken TCP connection (or, in this case a stuck one
+// where the peer never responds to PINGS). There should be an accepted event
+// then a disconnect event.
+static void
+test_heartbeat_timeout (void)
+{
+ int rc;
+
+ // Set up our context and sockets
+ void *ctx = zmq_ctx_new ();
+ assert (ctx);
+
+ void * server, * server_mon;
+ prep_server_socket(ctx, 1, 0, &server, &server_mon);
+
+ struct sockaddr_in ip4addr;
+ int s;
+
+ ip4addr.sin_family = AF_INET;
+ ip4addr.sin_port = htons(5556);
+ inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
+
+ s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr);
+ assert (rc > -1);
+
+ // Mock a ZMTP 3 client so we can forcibly time out a connection
+ mock_handshake(s);
+
+ // By now everything should report as connected
+ rc = get_monitor_event(server_mon);
+ assert(rc == ZMQ_EVENT_ACCEPTED);
+
+ // We should have been disconnected
+ rc = get_monitor_event(server_mon);
+ assert(rc == ZMQ_EVENT_DISCONNECTED);
+
+ close(s);
+
+ rc = zmq_close (server);
+ assert (rc == 0);
+
+ rc = zmq_close (server_mon);
+ assert (rc == 0);
+
+ rc = zmq_ctx_term (ctx);
+ assert (rc == 0);
+}
+
+// This checks that peers respect the TTL value in ping messages
+// We set up a mock ZMTP 3 client and send a ping message with a TLL
+// to a server that is not doing any heartbeating. Then we sleep,
+// if the server disconnects the client, then we know the TTL did
+// its thing correctly.
+static void
+test_heartbeat_ttl (void)
+{
+ int rc, value;
+
+ // Set up our context and sockets
+ void *ctx = zmq_ctx_new ();
+ assert (ctx);
+
+ void * server, * server_mon, *client;
+ prep_server_socket(ctx, 0, 0, &server, &server_mon);
+
+ client = zmq_socket(ctx, ZMQ_DEALER);
+ assert(client != NULL);
+
+ // Set the heartbeat TTL to 0.1 seconds
+ value = 100;
+ zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value));
+
+ // Set the heartbeat interval to much longer than the TTL so that
+ // the socket times out oon the remote side.
+ value = 250;
+ zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
+
+ rc = zmq_connect(client, "tcp://localhost:5556");
+ assert(rc == 0);
+
+ // By now everything should report as connected
+ rc = get_monitor_event(server_mon);
+ assert(rc == ZMQ_EVENT_ACCEPTED);
+
+ msleep(100);
+
+ // We should have been disconnected
+ rc = get_monitor_event(server_mon);
+ assert(rc == ZMQ_EVENT_DISCONNECTED);
+
+ rc = zmq_close (server);
+ assert (rc == 0);
+
+ rc = zmq_close (server_mon);
+ assert (rc == 0);
+
+ rc = zmq_close (client);
+ assert (rc == 0);
+
+ rc = zmq_ctx_term (ctx);
+ assert (rc == 0);
+}
+
+// This checks for normal operation - that is pings and pongs being
+// exchanged normally. There should be an accepted event on the server,
+// and then no event afterwards.
+static void
+test_heartbeat_notimeout (int is_curve)
+{
+ int rc;
+
+ // Set up our context and sockets
+ void *ctx = zmq_ctx_new ();
+ assert (ctx);
+
+ void * server, * server_mon;
+ prep_server_socket(ctx, 1, is_curve, &server, &server_mon);
+
+ void * client = zmq_socket(ctx, ZMQ_DEALER);
+ if(is_curve)
+ setup_curve(client, 0);
+ rc = zmq_connect(client, "tcp://127.0.0.1:5556");
+
+ // Give it a sec to connect and handshake
+ msleep(100);
+
+ // By now everything should report as connected
+ rc = get_monitor_event(server_mon);
+ assert(rc == ZMQ_EVENT_ACCEPTED);
+
+ // We should still be connected because pings and pongs are happenin'
+ rc = get_monitor_event(server_mon);
+ assert(rc == -1);
+
+ rc = zmq_close (client);
+ assert (rc == 0);
+
+ rc = zmq_close (server);
+ assert (rc == 0);
+
+ rc = zmq_close (server_mon);
+ assert (rc == 0);
+
+ rc = zmq_ctx_term (ctx);
+ assert (rc == 0);
+}
+
+int main (void)
+{
+ setup_test_environment();
+ test_heartbeat_timeout();
+ test_heartbeat_ttl();
+ // Run this test without curve
+ test_heartbeat_notimeout(0);
+ // Then rerun it with curve
+ test_heartbeat_notimeout(1);
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment