Skip to content

Instantly share code, notes, and snippets.

@dfukunaga
Created April 5, 2017 12:54
Show Gist options
  • Save dfukunaga/191c2f7f3d49da2800343f74ad1ba992 to your computer and use it in GitHub Desktop.
Save dfukunaga/191c2f7f3d49da2800343f74ad1ba992 to your computer and use it in GitHub Desktop.
#include <zmq.h>
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
int main(int argc, char** argv) {
if (argc != 3) {
std::cout << "Usage: " << argv[0] << " [type] [address]" << std::endl;
return 1;
}
std::string type(argv[1]);
std::string address(argv[2]);
void* ctx = zmq_ctx_new();
if (type == "bind") {
void* pull = zmq_socket(ctx, ZMQ_PULL);
zmq_bind(pull, "tcp://*:2551"); // bind to remote
zmq_bind(pull, "inproc://2551"); // bind to local
zmq_socket_monitor(pull, "inproc://monitor", ZMQ_EVENT_ALL);
void* monitor = zmq_socket(ctx, ZMQ_PAIR);
zmq_connect(monitor, "inproc://monitor");
void* push = zmq_socket(ctx, ZMQ_PUSH);
zmq_connect(push, "inproc://2551"); // connect to local
// launch receive thread
std::thread th([pull, monitor]() {
for (int i = 0; i < 30; ++i) {
zmq_pollitem_t items[] = {{pull, 0, ZMQ_POLLIN, 0},
{monitor, 0, ZMQ_POLLIN, 0}};
zmq_poll(items, 2, 1000);
if (items[0].revents & ZMQ_POLLIN) {
char msg[64];
int len = zmq_recv(pull, msg, 64, 0);
msg[len] = '\0';
std::cout << "receive " << msg << std::endl;
}
if (items[1].revents & ZMQ_POLLIN) {
uint16_t event;
zmq_recv(monitor, &event, 2, 0);
char address[64];
int len = zmq_recv(monitor, address, 64, 0);
address[len] = '\0';
std::cout << "event " << event << " " << address << std::endl;
}
}
});
std::cout << "send messages..." << std::endl;
for (int i = 1; i <= 10; ++i) {
char msg[64];
int len = std::sprintf(msg, "from local (%d/10)", i);
zmq_send(push, msg, len, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
std::cout << "close socket" << std::endl;
zmq_close(push);
// join receive thread
th.join();
zmq_close(pull);
zmq_close(monitor);
} else if (type == "connect") {
void* push = zmq_socket(ctx, ZMQ_PUSH);
zmq_connect(push,
("tcp://" + address + ":2551").c_str()); // connect to remote
std::cout << "send messages..." << std::endl;
for (int i = 1; i <= 10; ++i) {
char msg[64];
int len = std::sprintf(msg, "from remote (%d/10)", i);
zmq_send(push, msg, len, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
std::cout << "close socket" << std::endl;
zmq_close(push);
}
zmq_ctx_term(ctx);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment