Skip to content

Instantly share code, notes, and snippets.

@BastienDurel
Last active August 29, 2015 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BastienDurel/92593ea3121c293148d9 to your computer and use it in GitHub Desktop.
Save BastienDurel/92593ea3121c293148d9 to your computer and use it in GitHub Desktop.
push/pull test
run server on machine A, run client on machine B
then run on machine A
iptables -I INPUT -p tcp -m tcp --dport 5555 -j DROP ; sleep 2 ; iptables -D INPUT -p tcp -m tcp --dport 5555 -j DROP
before client sends all messages
I've included pcap traces of a run test. I put a 120ms delay between each message, reduced message count, and activated a fast TCP keepalive (idle = 1, cnt = 1, intvl = 1)
In the "bad" trace I put the DROP ruel a approx. t+2.3
I got this in server :
error: id is 36 and last id was 19
got end, count = 85
#include <memory>
#include <stdexcept>
#include <string>
#include <iostream>
#include <cstring>
#include <zmq.h>
#include "common.h"
using std::string;
using std::unique_ptr;
using std::runtime_error;
using std::cout;
using std::endl;
void * ctx = 0;
void send(void*, int);
int main(int c, const char**v) {
int rc;
ctx = zmq_ctx_new();
auto sock = zmq_socket(ctx, ZMQ_PUSH);
if (!sock)
ERR("zmq_socket");
cout << 1 << endl;
const char * endpoint = "tcp://127.0.0.1:5555";
if (c > 1)
endpoint = v[1];
const int wm = WM_VALUE;
rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &wm, sizeof(wm));
if (rc != 0)
ERR("zmq_setsockopt");
set_keepalive_options(sock);
rc = zmq_connect(sock, endpoint);
if (rc != 0)
ERR("zmq_connect");
cout << 2 << endl;
constexpr int max_msg = 10000000;
for (int i = 0; i < max_msg; ++i) {
send(sock, i);
}
send(sock, -1);
cout << 3 << endl;
rc = zmq_disconnect(sock, endpoint);
if (rc != 0)
ERR("zmq_disconnect");
cout << 4 << endl;
rc = zmq_close(sock);
if (rc != 0)
ERR("zmq_close");
cout << 5 << endl;
rc = zmq_ctx_term(ctx);
if (rc != 0)
ERR("zmq_ctx_term");
cout << 6 << endl;
return 0;
}
struct msg {
zmq_msg_t _m;
msg() { int rc = zmq_msg_init (&_m); if (rc != 0) ERR("zmq_msg_init"); }
explicit msg(int size) { int rc = zmq_msg_init_size (&_m, size); if (rc != 0) ERR("zmq_msg_init_size"); }
msg(const msg&) = delete;
~msg() { int rc = zmq_msg_close (&_m); if (rc != 0) ERR("zmq_msg_close"); }
operator zmq_msg_t*() { return &_m; }
};
void send(void* sock, int i) {
int rc = 0;
string m{"hello #"};
m.append(std::to_string(i));
if (i == -1)
m = "end";
msg message{(int)m.size()};
memmove(zmq_msg_data(message), m.c_str(), m.size());
const int size = zmq_msg_size(message);
rc = zmq_msg_send(message, sock, 0);
if (rc != size)
throw runtime_error("zmq_msg_send");
//cout << '.' << i << endl;
}
// -*- mode: c+ -*-
#if !defined TEST_PUSHPULL_COMMON_INCLUDED
#define TEST_PUSHPULL_COMMON_INCLUDED 1
#include <zmq.h>
#define WM_VALUE 100000000
inline void ERR(std::string&& err) throw(std::exception) {
err.append(": ").append(strerror(errno));
throw std::runtime_error(err.c_str());
}
inline void set_keepalive_options(void* sock) {
#if defined DOKEEPALIVE && DOKEEPALIVE > 0
int rc;
const int keepalive = 1; // use TCP SO_KEEPALIVE
const int ka_idle = 1; // Start keeplives after this period
const int ka_cnt = 1; // Interval between keepalives
const int ka_int = 1; // Number of keepalives before death
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE, &keepalive, sizeof(keepalive));
if (rc != 0)
ERR("zmq_setsockopt");
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_IDLE, &ka_idle, sizeof(ka_idle));
if (rc != 0)
ERR("zmq_setsockopt");
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_CNT, &ka_cnt, sizeof(ka_cnt));
if (rc != 0)
ERR("zmq_setsockopt");
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_INTVL, &ka_int, sizeof(ka_int));
if (rc != 0)
ERR("zmq_setsockopt");
#endif
}
#endif// ~TEST_PUSHPULL_COMMON_INCLUDED
SRCC=client.cpp
SRCS=server.cpp
OBJC=$(SRCC:.cpp=.o)
OBJS=$(SRCS:.cpp=.o)
ifneq ($(NOKEEPALIVE),1)
KA=-DDOKEEPALIVE=1
endif
OPT=-g -O2
CXXFLAGS=$(shell pkg-config --cflags libzmq) --std=c++11 $(OPT) $(KA)
LDFLAGS=$(shell pkg-config --libs libzmq) $(OPT)
all: client server
test-client: client
bash -c "time ./client"
test-server: server
./server
client: $(OBJC)
$(CXX) -o $@ $(OBJC) $(LDFLAGS)
server: $(OBJS)
$(CXX) -o $@ $(OBJS) $(LDFLAGS)
$(OBJC): common.h
$(OBJS): common.h
clean:
rm -f $(OBJS) $(OBJC) *~
fclean: clean
rm -f client server
#include <memory>
#include <stdexcept>
#include <string>
#include <cstring>
#include <cstdlib>
#include <thread>
#include <iostream>
#include <zmq.h>
#include "common.h"
using std::string;
using std::unique_ptr;
using std::runtime_error;
using std::cout;
using std::endl;
void * ctx = 0;
bool loop = true;
#if __cplusplus < 201103L
#define constexpr const
#endif
void zrecv(void*);
int main() {
int rc;
ctx = zmq_ctx_new();
auto sock = zmq_socket(ctx, ZMQ_PULL);
if (!sock)
ERR("zmq_socket");
const int wm = WM_VALUE;
rc = zmq_setsockopt(sock, ZMQ_RCVHWM, &wm, sizeof(wm));
if (rc != 0)
ERR("zmq_setsockopt");
set_keepalive_options(sock);
rc = zmq_bind(sock, "tcp://*:5555");
if (rc != 0)
ERR("zmq_bind");
while (loop) {
try {
zrecv(sock);
} catch (const runtime_error& ex) {
cout << ex.what() << endl;
return 1;
}
}
rc = zmq_close(sock);
if (rc != 0)
ERR("zmq_close");
rc = zmq_ctx_term(ctx);
if (rc != 0)
ERR("zmq_ctx_term");
return 0;
}
void zrecv(void* sock) {
static size_t count = 0;
static long last_id = -1;
zmq_msg_t message;
int rc = zmq_msg_init(&message);
if (rc != 0)
ERR("zmq_msg_init");
//constexpr int flags = ZMQ_DONTWAIT;
constexpr int flags = 0;
rc = zmq_msg_recv(&message, sock, flags);
if (rc < 0) {
if (errno != EAGAIN)
ERR("zmq_msg_recv");
std::this_thread::yield();
return;
}
// TODO: handling
//cout << "got " << rc << " bytes" << endl;
string m{static_cast<const char*>(zmq_msg_data(&message)),
static_cast<string::size_type>(rc)};
if (m == "end") {
last_id = -1;
cout << "got end, count = " << count << endl;
//loop = false;
}
else {
if (m.compare(0, 7, "hello #") == 0) {
long id = std::strtol(m.c_str() + 7, nullptr, 10);
if (id > last_id + 1 && last_id != -1)
{
cout << "error: id is " << id << " and last id was " << last_id << endl;
}
last_id = id;
} else {
string err{"bad message: "};
err.append(m);
throw runtime_error(err.c_str());
}
}
++count;
if ((count & 0x03ffff) == 0)
cout << count << endl;
rc = zmq_msg_close (&message);
if (rc != 0)
ERR("zmq_msg_close");
}
@BastienDurel
Copy link
Author

I've made tests with server under Arch (zeromq 4.0.5/gcc 4.9.2) and client under Arch, Debian stable (zeromq 3.2.3+dfsg-2~bpo70+1/gcc 4.7.2-5), Debian sid (zeromq 4.0.5+dfsg-2/gcc 4.9.2), Windows (zeromq 4.0.4 / msvc12 or netmq)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment