Skip to content

Instantly share code, notes, and snippets.

@dfukunaga
Last active April 28, 2020 04:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dfukunaga/0ae622d977b780012edb492c25ff0bac to your computer and use it in GitHub Desktop.
Save dfukunaga/0ae622d977b780012edb492c25ff0bac to your computer and use it in GitHub Desktop.
zeromq heartbeat test
#include <zmq.h>
#include <atomic>
#include <iostream>
#include <thread>
class socket_monitor {
public:
socket_monitor(void* ctx, void* socket) {
// Start monitoring
zmq_socket_monitor(socket, "inproc://monitor", ZMQ_EVENT_ALL);
// Create a socket for collecting monitor events
this->monitor_sock = zmq_socket(ctx, ZMQ_PAIR);
zmq_connect(this->monitor_sock, "inproc://monitor");
// Start a monitor thread
this->flag.test_and_set();
this->thread = std::thread([this]() {
while (this->flag.test_and_set()) print_monitor_event();
});
}
~socket_monitor() {
// Stop the monitor thread
this->flag.clear();
this->thread.join();
zmq_close(this->monitor_sock);
}
private:
std::thread thread;
std::atomic_flag flag = ATOMIC_FLAG_INIT;
void* monitor_sock = NULL;
void print_monitor_event() {
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, this->monitor_sock, ZMQ_DONTWAIT) == -1 &&
zmq_msg_more(&msg)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return;
}
char* data = (char*)(zmq_msg_data(&msg));
uint16_t event = *(uint16_t*)(data);
uint32_t value = *(uint32_t*)(data + 2);
// Second frame in message contains event address
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, this->monitor_sock, ZMQ_DONTWAIT) == -1) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return;
}
data = (char*)(zmq_msg_data(&msg));
size_t size = zmq_msg_size(&msg);
std::string address(data, size);
std::cout << "[event] number: " << event << ", value: " << value
<< ", address: " << address << std::endl;
}
};
int main(int argc, char** argv) {
if (argc != 3) {
std::cout << "Usage: " << argv[0] << " [address]" << std::endl;
return 1;
}
std::string type(argv[1]); // bind or connect
std::string address(argv[2]); // ex. tcp://127.0.0.1:2551
void* ctx = zmq_ctx_new();
// We'll monitor this socket
std::cout << "[main] Create..." << std::endl;
int sock_type = type == "bind" ? ZMQ_PULL : ZMQ_PUSH;
void* sock = zmq_socket(ctx, sock_type);
auto monitor = new socket_monitor(ctx, sock);
if (type == "bind") {
// Bind
std::cout << "[main] Bind..." << std::endl;
zmq_bind(sock, address.c_str());
// Block
getchar();
// Unbind
std::cout << "[main] Unbind..." << std::endl;
zmq_unbind(sock, address.c_str());
} else {
// Heartbeat settings
int interval = 100, timeout = 1000, ttl = 1000;
zmq_setsockopt(sock, ZMQ_HEARTBEAT_IVL, &interval, sizeof(int));
zmq_setsockopt(sock, ZMQ_HEARTBEAT_TIMEOUT, &timeout, sizeof(int));
zmq_setsockopt(sock, ZMQ_HEARTBEAT_TTL, &ttl, sizeof(int));
zmq_setsockopt(sock, ZMQ_HANDSHAKE_IVL, &timeout, sizeof(int));
// Connect
std::cout << "[main] Connect..." << std::endl;
zmq_connect(sock, address.c_str());
// Block
getchar();
// Disconnect
std::cout << "[main] Disconnect..." << std::endl;
zmq_disconnect(sock, address.c_str());
}
// Block
getchar();
// Finish
std::cout << "[main] Finish..." << std::endl;
delete monitor;
zmq_close(sock);
zmq_ctx_term(ctx);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment