Last active
April 28, 2020 04:00
-
-
Save dfukunaga/0ae622d977b780012edb492c25ff0bac to your computer and use it in GitHub Desktop.
zeromq heartbeat test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <zmq.h> | |
#include <atomic> | |
#include <iostream> | |
#include <thread> | |
class socket_monitor { | |
public: | |
socket_monitor(void* ctx, void* socket) { | |
// Start monitoring | |
zmq_socket_monitor(socket, "inproc://monitor", ZMQ_EVENT_ALL); | |
// Create a socket for collecting monitor events | |
this->monitor_sock = zmq_socket(ctx, ZMQ_PAIR); | |
zmq_connect(this->monitor_sock, "inproc://monitor"); | |
// Start a monitor thread | |
this->flag.test_and_set(); | |
this->thread = std::thread([this]() { | |
while (this->flag.test_and_set()) print_monitor_event(); | |
}); | |
} | |
~socket_monitor() { | |
// Stop the monitor thread | |
this->flag.clear(); | |
this->thread.join(); | |
zmq_close(this->monitor_sock); | |
} | |
private: | |
std::thread thread; | |
std::atomic_flag flag = ATOMIC_FLAG_INIT; | |
void* monitor_sock = NULL; | |
void print_monitor_event() { | |
// First frame in message contains event number and value | |
zmq_msg_t msg; | |
zmq_msg_init(&msg); | |
if (zmq_msg_recv(&msg, this->monitor_sock, ZMQ_DONTWAIT) == -1 && | |
zmq_msg_more(&msg)) { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
return; | |
} | |
char* data = (char*)(zmq_msg_data(&msg)); | |
uint16_t event = *(uint16_t*)(data); | |
uint32_t value = *(uint32_t*)(data + 2); | |
// Second frame in message contains event address | |
zmq_msg_init(&msg); | |
if (zmq_msg_recv(&msg, this->monitor_sock, ZMQ_DONTWAIT) == -1) { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
return; | |
} | |
data = (char*)(zmq_msg_data(&msg)); | |
size_t size = zmq_msg_size(&msg); | |
std::string address(data, size); | |
std::cout << "[event] number: " << event << ", value: " << value | |
<< ", address: " << address << std::endl; | |
} | |
}; | |
int main(int argc, char** argv) { | |
if (argc != 3) { | |
std::cout << "Usage: " << argv[0] << " [address]" << std::endl; | |
return 1; | |
} | |
std::string type(argv[1]); // bind or connect | |
std::string address(argv[2]); // ex. tcp://127.0.0.1:2551 | |
void* ctx = zmq_ctx_new(); | |
// We'll monitor this socket | |
std::cout << "[main] Create..." << std::endl; | |
int sock_type = type == "bind" ? ZMQ_PULL : ZMQ_PUSH; | |
void* sock = zmq_socket(ctx, sock_type); | |
auto monitor = new socket_monitor(ctx, sock); | |
if (type == "bind") { | |
// Bind | |
std::cout << "[main] Bind..." << std::endl; | |
zmq_bind(sock, address.c_str()); | |
// Block | |
getchar(); | |
// Unbind | |
std::cout << "[main] Unbind..." << std::endl; | |
zmq_unbind(sock, address.c_str()); | |
} else { | |
// Heartbeat settings | |
int interval = 100, timeout = 1000, ttl = 1000; | |
zmq_setsockopt(sock, ZMQ_HEARTBEAT_IVL, &interval, sizeof(int)); | |
zmq_setsockopt(sock, ZMQ_HEARTBEAT_TIMEOUT, &timeout, sizeof(int)); | |
zmq_setsockopt(sock, ZMQ_HEARTBEAT_TTL, &ttl, sizeof(int)); | |
zmq_setsockopt(sock, ZMQ_HANDSHAKE_IVL, &timeout, sizeof(int)); | |
// Connect | |
std::cout << "[main] Connect..." << std::endl; | |
zmq_connect(sock, address.c_str()); | |
// Block | |
getchar(); | |
// Disconnect | |
std::cout << "[main] Disconnect..." << std::endl; | |
zmq_disconnect(sock, address.c_str()); | |
} | |
// Block | |
getchar(); | |
// Finish | |
std::cout << "[main] Finish..." << std::endl; | |
delete monitor; | |
zmq_close(sock); | |
zmq_ctx_term(ctx); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment