Created
June 3, 2021 15:11
-
-
Save Jihadist/895496e17d869eae5ca999c72dc25eda to your computer and use it in GitHub Desktop.
zeromq ROUTER-DEALER example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <chrono> | |
#include <functional> | |
#include <iostream> | |
#include <memory> | |
#include <thread> | |
#include <vector> | |
#include <zmq.hpp> | |
#define within(num) (int)((float)((num)*random()) / (RAND_MAX + 1.0)) | |
// This is our client task class. | |
// It connects to the server, and then sends a request once per second | |
// It collects responses as they arrive, and it prints them out. We will | |
// run several client tasks in parallel, each with a different random ID. | |
// Attention! -- this random work well only on linux. | |
class client_task { | |
public: | |
client_task() | |
: ctx_(1) | |
, client_socket_(ctx_, ZMQ_DEALER) | |
{ | |
} | |
int counter_rep = 0; | |
int counter_req = 0; | |
void dump(zmq::socket_t& socket); | |
void start() | |
{ | |
// generate random identity | |
char identity[10] = {}; | |
sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000)); | |
printf("%s\n", identity); | |
client_socket_.set(zmq::sockopt::routing_id, identity); | |
; | |
client_socket_.connect("ipc:///tmp/feeds/0"); | |
zmq::pollitem_t items[] = { | |
{ static_cast<void*>(client_socket_), 0, ZMQ_POLLIN, 0 } | |
}; | |
int request_nbr = 0; | |
try { | |
while (true) { | |
for (int i = 0; i < 100; ++i) { | |
// 10 milliseconds | |
zmq::poll(items, 1, 10); | |
if (items[0].revents & ZMQ_POLLIN) { | |
printf("\n%s ", identity); | |
dump(client_socket_); | |
} | |
} | |
int requests = within(5); | |
for (int request = 0; request < requests; ++request) { | |
std::this_thread::sleep_for(std::chrono_literals::operator""ms(within(1000) + 1)); | |
char request_string[16] = {}; | |
sprintf(request_string, "request #%d", ++request_nbr); | |
zmq::message_t request_message(request_string, strlen(request_string)); | |
std::cout << "Client send message " << request << " : " << request_message.str() << std::endl; | |
client_socket_.send(request_message, zmq::send_flags::none); | |
counter_req++; | |
} | |
} | |
} catch (std::exception& e) { | |
} | |
} | |
private: | |
zmq::context_t ctx_; | |
zmq::socket_t client_socket_; | |
}; | |
void client_task::dump(zmq::socket_t& socket) | |
{ | |
std::cout << "----------------------------------------" << std::endl; | |
while (1) { | |
// Process all parts of the message | |
zmq::message_t message; | |
socket.recv(message); | |
counter_rep++; | |
std::cout << "Client worker received : " << message.str() << std::endl; | |
// Dump the message as text or binary | |
size_t size = message.size(); | |
std::string data(static_cast<char*>(message.data()), size); | |
bool is_text = true; | |
size_t char_nbr; | |
unsigned char byte; | |
for (char_nbr = 0; char_nbr < size; char_nbr++) { | |
byte = data[char_nbr]; | |
if (byte < 32 || byte > 127) | |
is_text = false; | |
} | |
std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]"; | |
for (char_nbr = 0; char_nbr < size; char_nbr++) { | |
if (is_text) | |
std::cout << (char)data[char_nbr]; | |
else | |
std::cout << std::setfill('0') << std::setw(2) | |
<< std::hex << (unsigned int)data[char_nbr]; | |
} | |
std::cout << std::endl; | |
int more = socket.get(zmq::sockopt::rcvmore); // Multipart detection | |
if (!more) | |
break; // Last message part | |
} | |
} | |
// .split worker task | |
// Each worker task works on one request at a time and sends a random number | |
// of replies back, with random delays between replies: | |
class server_worker { | |
public: | |
server_worker(zmq::context_t& ctx, int sock_type) | |
: ctx_(ctx) | |
, worker_(ctx_, sock_type) | |
{ | |
} | |
int counter_rep = 0; | |
int counter_req = 0; | |
void work() | |
{ | |
worker_.connect("inproc://backend"); | |
try { | |
while (true) { | |
zmq::message_t identity; | |
zmq::message_t msg; | |
zmq::message_t copied_id; | |
zmq::message_t copied_msg; | |
worker_.recv(identity); | |
worker_.recv(msg); | |
counter_rep++; | |
std::cout << "Server worker: " << identity.str() << ", received : " << msg.str() << std::endl; | |
int replies = within(5); | |
for (int reply = 0; reply < replies; ++reply) { | |
std::this_thread::sleep_for(std::chrono_literals::operator""ms(within(1000) + 1)); | |
copied_id.copy(identity); | |
copied_msg.copy(msg); | |
std::cout << __LINE__ << " : " | |
<< "Server worker: " << identity.str() << ", send " | |
<< "reply " << reply << " : " << msg.str() << std::endl; | |
worker_.send(copied_id, zmq::send_flags::sndmore); | |
worker_.send(copied_msg, zmq::send_flags::none); | |
counter_req++; | |
} | |
} | |
} catch (std::exception& e) { | |
} | |
} | |
private: | |
zmq::context_t& ctx_; | |
zmq::socket_t worker_; | |
}; | |
struct counters { | |
int* counter_rep; | |
int* counter_req; | |
}; | |
// .split server task | |
// This is our server task. | |
// It uses the multithreaded server model to deal requests out to a pool | |
// of workers and route replies back to clients. One worker can handle | |
// one request at a time but one client can talk to multiple workers at | |
// once. | |
class server_task { | |
public: | |
server_task() | |
: ctx_(1) | |
, frontend_(ctx_, ZMQ_ROUTER) | |
, backend_(ctx_, ZMQ_DEALER) | |
{ | |
} | |
enum { kMaxThread = 5 }; | |
void run() | |
{ | |
frontend_.bind("ipc:///tmp/feeds/0"); | |
backend_.bind("inproc://backend"); | |
std::vector<server_worker*> worker; | |
std::vector<std::thread*> worker_thread; | |
for (auto i = 0; i < kMaxThread; ++i) { | |
auto new_worker = new server_worker(ctx_, ZMQ_DEALER); | |
counters counter { &new_worker->counter_rep, &new_worker->counter_req }; | |
counters_.push_back(counter); | |
worker.push_back(new_worker); | |
worker_thread.push_back(new std::thread(std::bind(&server_worker::work, worker[i]))); | |
worker_thread[i]->detach(); | |
} | |
try { | |
zmq::proxy(frontend_, backend_); | |
} catch (std::exception& e) { | |
} | |
for (auto i = 0; i < kMaxThread; ++i) { | |
delete worker[i]; | |
delete worker_thread[i]; | |
} | |
} | |
std::vector<counters> counters_; | |
private: | |
std::vector<server_worker*> worker; | |
zmq::context_t ctx_; | |
zmq::socket_t frontend_; | |
zmq::socket_t backend_; | |
}; | |
// The main thread simply starts several clients and a server, and then | |
// waits for the server to finish. | |
using namespace std; | |
int main() | |
{ | |
cout << "Hello World!" << endl; | |
client_task ct1; | |
client_task ct2; | |
client_task ct3; | |
server_task st; | |
std::thread t1(std::bind(&client_task::start, &ct1)); | |
std::thread t2(std::bind(&client_task::start, &ct2)); | |
std::thread t3(std::bind(&client_task::start, &ct3)); | |
std::thread t4(std::bind(&server_task::run, &st)); | |
t1.detach(); | |
t2.detach(); | |
t3.detach(); | |
t4.detach(); | |
std::this_thread::sleep_for(std::chrono_literals::operator""s(60)); | |
// Attention, we can miss some requests this way | |
std::cout << "Client1: " << ct1.counter_rep << ":" << ct1.counter_req << std::endl; | |
std::cout << "Client2: " << ct2.counter_rep << ":" << ct2.counter_req << std::endl; | |
std::cout << "Client3: " << ct3.counter_rep << ":" << ct3.counter_req << std::endl; | |
std::cout << "All clients: " << ct3.counter_rep + ct1.counter_rep + ct2.counter_rep << ":" << ct1.counter_req + ct2.counter_req + ct3.counter_req << std::endl; | |
int counter_rep = 0, counter_req = 0; | |
for (const auto& counters : st.counters_) { | |
counter_rep += *counters.counter_rep; | |
counter_req += *counters.counter_req; | |
std::cout << "Server thread: " << *counters.counter_rep << ":" << *counters.counter_req << std::endl; | |
} | |
std::cout << "All server threads: " << counter_rep << ":" << counter_req << std::endl; | |
//getchar(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment