Last active
January 8, 2017 13:05
-
-
Save laplaceyang/40374e2b40a5bfec56c7ce69749418eb to your computer and use it in GitHub Desktop.
test case for zeromq issues#2228
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The follow code is to simulate a normal service | |
// | |
// [Req] usr -> frontend svr -> backend svr | |
// [Rsp] backend svr -> frontend svr -> usr | |
// | |
// if we build follow code with zmq 4.2.0 | |
// then we can only recv up to 1000 msg from backend | |
// but if we build with zmq 4.1.6 | |
// we can recv all the response msg from backend | |
#include <unistd.h> | |
#include <iostream> | |
#include <string> | |
#include <thread> | |
#include "zmq.hpp" | |
zmq::context_t ctx(1); | |
//msg amount should must bigger than 1000 (default hwm) | |
int amount = 5000; | |
//front svr | |
class FrontendSvr { | |
public: | |
FrontendSvr() : frontend_sock_(NULL), backend_sock_(NULL), msg_send_(0), msg_recv_(0) {} | |
~FrontendSvr() {delete frontend_sock_; delete backend_sock_;} | |
void init() { | |
//backend sock is for connect to backend svr, and set hwm after connect | |
backend_sock_ = new zmq::socket_t(ctx, ZMQ_DEALER); | |
backend_sock_->connect("tcp://127.0.0.1:88888"); | |
int hwm = 100; | |
backend_sock_->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); | |
backend_sock_->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); | |
//for recv msg from main | |
frontend_sock_ = new zmq::socket_t(ctx, ZMQ_ROUTER); | |
frontend_sock_->bind("inproc://queue"); | |
} | |
//loop | |
void run() { | |
zmq::pollitem_t items[2]; | |
items[0].socket = (void*) *frontend_sock_; | |
items[0].fd = 0; | |
items[0].events = ZMQ_POLLIN; | |
items[1].socket = (void*) *backend_sock_; | |
items[1].fd = 0; | |
items[1].events = ZMQ_POLLIN; | |
while(1) { | |
int rc = zmq::poll(items, 2, -1); | |
if(rc < 0) | |
continue; | |
if(items[0].revents & ZMQ_POLLIN) { | |
on_request(); | |
} | |
if(items[1].revents & ZMQ_POLLIN) { | |
on_response(); | |
} | |
if((msg_send_ > 0 && msg_send_ % 100 == 0) || (msg_recv_ > 0 && msg_recv_ % 100 == 0)) | |
std::cout << "[FRONTEND SVR] send msg of " << (double) msg_send_ / amount * 100 | |
<< "% and recv msg of " << (double) msg_recv_ / amount * 100 << "%\n"; | |
if(msg_send_ == amount || msg_recv_ == amount) | |
print_stat(); | |
} | |
} | |
// msg in of inproc | |
void on_request() { | |
zmq::message_t msg; | |
int more = 1; | |
size_t more_size = sizeof(more); | |
while(more) { | |
int rc = frontend_sock_->recv(&msg, 0); | |
assert(rc > 0); | |
frontend_sock_->getsockopt(ZMQ_RCVMORE, &more, &more_size); | |
rc = backend_sock_->send(msg, more ? ZMQ_SNDMORE : 0); | |
assert(rc > 0); | |
} | |
msg_send_++; | |
} | |
//msg in of tcp | |
void on_response() { | |
zmq::message_t msg; | |
int more = 1; | |
size_t more_size = sizeof(more); | |
while(more) { | |
int rc = backend_sock_->recv(&msg, 0); | |
assert(rc > 0); | |
backend_sock_->getsockopt(ZMQ_RCVMORE, &more, &more_size); | |
} | |
msg_recv_++; | |
} | |
void print_stat() { | |
std::cout << "[FRONTEND SVR: "<<__func__<<"] line: " << __LINE__ | |
<< " msg recv: " << msg_recv_ << ", msg send: " << msg_send_ << "\n"; | |
} | |
private: | |
zmq::socket_t* frontend_sock_; | |
zmq::socket_t* backend_sock_; | |
int msg_send_; | |
int msg_recv_; | |
}; | |
//backend svr | |
class BackendSvr { | |
public: | |
BackendSvr() : frontend_sock_(NULL), msg_count_(0) {} | |
~BackendSvr() {delete frontend_sock_;} | |
//just listen to tcp | |
void init() { | |
frontend_sock_ = new zmq::socket_t(ctx, ZMQ_ROUTER); | |
frontend_sock_->bind("tcp://127.0.0.1:88888"); | |
} | |
//loop | |
void run() { | |
zmq::pollitem_t items[1]; | |
items[0].socket = (void*) *frontend_sock_; | |
items[0].fd = 0; | |
items[0].events = ZMQ_POLLIN; | |
while(1) { | |
int rc = zmq::poll(items, 1, -1); | |
if(rc < 0) | |
continue; | |
if(items[0].revents & ZMQ_POLLIN) { | |
on_request(); | |
} | |
if(msg_count_ > 0 && msg_count_ % 100 == 0) | |
std::cout << "[BACKEND SVR] msg of " << (double) msg_count_ / amount * 100 << "%\n"; | |
if(msg_count_ == amount) | |
print_stat(); | |
} | |
} | |
//just rcv msg from front svr and send back | |
void on_request() { | |
zmq::message_t msg; | |
int more = 1; | |
size_t more_size = sizeof(more); | |
while(more) { | |
int rc = frontend_sock_->recv(&msg, 0); | |
assert(rc > 0); | |
frontend_sock_->getsockopt(ZMQ_RCVMORE, &more, &more_size); | |
rc = frontend_sock_->send(msg, more ? ZMQ_SNDMORE : 0); | |
assert(rc > 0); | |
} | |
msg_count_++; | |
} | |
void print_stat() { | |
std::cout << "[BACKEND SVR: "<<__func__<<"] line: " << __LINE__ | |
<< " msg count: " << msg_count_ << "\n"; | |
} | |
private: | |
zmq::socket_t* frontend_sock_; | |
int msg_count_; | |
}; | |
// wrap for thread | |
void run_frontend_svr() { | |
FrontendSvr svr; | |
svr.init(); | |
std::cout << "frontend svr inited.\n"; | |
svr.run(); | |
} | |
//wrap for thread | |
void run_backend_svr() { | |
BackendSvr svr; | |
svr.init(); | |
std::cout << "backend svr inited.\n"; | |
svr.run(); | |
} | |
//main func | |
int main(int argc, char **argv) { | |
std::thread backend_thread(&run_backend_svr); | |
sleep(1); | |
std::thread frontend_thread(&run_frontend_svr); | |
sleep(1); | |
//msg send to front svr | |
zmq::socket_t req(ctx, ZMQ_PUSH); | |
req.connect("inproc://queue"); | |
std::string str("msg to be send"); | |
zmq::message_t msg(str.data(), str.size()); | |
int i = 0; | |
while(i++ < amount) { | |
req.send(&msg, 0); | |
if(i % 100 == 0) { | |
std::cout << "[main] send msg of " << (double)i / amount * 100 << "% \n"; | |
} | |
} | |
frontend_thread.join(); | |
backend_thread.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment