Skip to content

Instantly share code, notes, and snippets.

@laplaceyang
Last active January 8, 2017 13:05
Show Gist options
  • Save laplaceyang/40374e2b40a5bfec56c7ce69749418eb to your computer and use it in GitHub Desktop.
Save laplaceyang/40374e2b40a5bfec56c7ce69749418eb to your computer and use it in GitHub Desktop.
test case for zeromq issues#2228
// The follow code is to simulate a normal service
//
// [Req] usr -> frontend svr -> backend svr
// [Rsp] backend svr -> frontend svr -> usr
//
// if we build follow code with zmq 4.2.0
// then we can only recv up to 1000 msg from backend
// but if we build with zmq 4.1.6
// we can recv all the response msg from backend
#include <unistd.h>
#include <iostream>
#include <string>
#include <thread>
#include "zmq.hpp"
zmq::context_t ctx(1);
//msg amount should must bigger than 1000 (default hwm)
int amount = 5000;
//front svr
class FrontendSvr {
public:
FrontendSvr() : frontend_sock_(NULL), backend_sock_(NULL), msg_send_(0), msg_recv_(0) {}
~FrontendSvr() {delete frontend_sock_; delete backend_sock_;}
void init() {
//backend sock is for connect to backend svr, and set hwm after connect
backend_sock_ = new zmq::socket_t(ctx, ZMQ_DEALER);
backend_sock_->connect("tcp://127.0.0.1:88888");
int hwm = 100;
backend_sock_->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
backend_sock_->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
//for recv msg from main
frontend_sock_ = new zmq::socket_t(ctx, ZMQ_ROUTER);
frontend_sock_->bind("inproc://queue");
}
//loop
void run() {
zmq::pollitem_t items[2];
items[0].socket = (void*) *frontend_sock_;
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[1].socket = (void*) *backend_sock_;
items[1].fd = 0;
items[1].events = ZMQ_POLLIN;
while(1) {
int rc = zmq::poll(items, 2, -1);
if(rc < 0)
continue;
if(items[0].revents & ZMQ_POLLIN) {
on_request();
}
if(items[1].revents & ZMQ_POLLIN) {
on_response();
}
if((msg_send_ > 0 && msg_send_ % 100 == 0) || (msg_recv_ > 0 && msg_recv_ % 100 == 0))
std::cout << "[FRONTEND SVR] send msg of " << (double) msg_send_ / amount * 100
<< "% and recv msg of " << (double) msg_recv_ / amount * 100 << "%\n";
if(msg_send_ == amount || msg_recv_ == amount)
print_stat();
}
}
// msg in of inproc
void on_request() {
zmq::message_t msg;
int more = 1;
size_t more_size = sizeof(more);
while(more) {
int rc = frontend_sock_->recv(&msg, 0);
assert(rc > 0);
frontend_sock_->getsockopt(ZMQ_RCVMORE, &more, &more_size);
rc = backend_sock_->send(msg, more ? ZMQ_SNDMORE : 0);
assert(rc > 0);
}
msg_send_++;
}
//msg in of tcp
void on_response() {
zmq::message_t msg;
int more = 1;
size_t more_size = sizeof(more);
while(more) {
int rc = backend_sock_->recv(&msg, 0);
assert(rc > 0);
backend_sock_->getsockopt(ZMQ_RCVMORE, &more, &more_size);
}
msg_recv_++;
}
void print_stat() {
std::cout << "[FRONTEND SVR: "<<__func__<<"] line: " << __LINE__
<< " msg recv: " << msg_recv_ << ", msg send: " << msg_send_ << "\n";
}
private:
zmq::socket_t* frontend_sock_;
zmq::socket_t* backend_sock_;
int msg_send_;
int msg_recv_;
};
//backend svr
class BackendSvr {
public:
BackendSvr() : frontend_sock_(NULL), msg_count_(0) {}
~BackendSvr() {delete frontend_sock_;}
//just listen to tcp
void init() {
frontend_sock_ = new zmq::socket_t(ctx, ZMQ_ROUTER);
frontend_sock_->bind("tcp://127.0.0.1:88888");
}
//loop
void run() {
zmq::pollitem_t items[1];
items[0].socket = (void*) *frontend_sock_;
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
while(1) {
int rc = zmq::poll(items, 1, -1);
if(rc < 0)
continue;
if(items[0].revents & ZMQ_POLLIN) {
on_request();
}
if(msg_count_ > 0 && msg_count_ % 100 == 0)
std::cout << "[BACKEND SVR] msg of " << (double) msg_count_ / amount * 100 << "%\n";
if(msg_count_ == amount)
print_stat();
}
}
//just rcv msg from front svr and send back
void on_request() {
zmq::message_t msg;
int more = 1;
size_t more_size = sizeof(more);
while(more) {
int rc = frontend_sock_->recv(&msg, 0);
assert(rc > 0);
frontend_sock_->getsockopt(ZMQ_RCVMORE, &more, &more_size);
rc = frontend_sock_->send(msg, more ? ZMQ_SNDMORE : 0);
assert(rc > 0);
}
msg_count_++;
}
void print_stat() {
std::cout << "[BACKEND SVR: "<<__func__<<"] line: " << __LINE__
<< " msg count: " << msg_count_ << "\n";
}
private:
zmq::socket_t* frontend_sock_;
int msg_count_;
};
// wrap for thread
void run_frontend_svr() {
FrontendSvr svr;
svr.init();
std::cout << "frontend svr inited.\n";
svr.run();
}
//wrap for thread
void run_backend_svr() {
BackendSvr svr;
svr.init();
std::cout << "backend svr inited.\n";
svr.run();
}
//main func
int main(int argc, char **argv) {
std::thread backend_thread(&run_backend_svr);
sleep(1);
std::thread frontend_thread(&run_frontend_svr);
sleep(1);
//msg send to front svr
zmq::socket_t req(ctx, ZMQ_PUSH);
req.connect("inproc://queue");
std::string str("msg to be send");
zmq::message_t msg(str.data(), str.size());
int i = 0;
while(i++ < amount) {
req.send(&msg, 0);
if(i % 100 == 0) {
std::cout << "[main] send msg of " << (double)i / amount * 100 << "% \n";
}
}
frontend_thread.join();
backend_thread.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment