Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
zeromq socket monitor test
#include <zmq.h>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <string>
#include <thread>
double elapsed() {
static auto begin = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now();
auto t = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin);
return static_cast<double>(t.count()) / 1000;
}
void print_monitor_event(void* monitor) {
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, monitor, ZMQ_DONTWAIT) == -1 && zmq_msg_more(&msg)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return;
}
char* data = (char*)(zmq_msg_data(&msg));
uint16_t event = *(uint16_t*)(data);
uint32_t value = *(uint32_t*)(data + 2);
// Second frame in message contains event address
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, monitor, ZMQ_DONTWAIT) == -1) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return;
}
data = (char*)(zmq_msg_data(&msg));
size_t size = zmq_msg_size(&msg);
std::string address(data, size);
std::cout << "[" << elapsed() << "] [event] number: " << event
<< ", value: " << value << ", address: " << address << std::endl;
}
int main(int argc, char** argv) {
if (argc != 3) {
std::cout << "Usage: " << argv[0] << " [type] [address]" << std::endl;
return 1;
}
std::cout << std::fixed << std::setprecision(3);
std::string type(argv[1]);
std::string address(argv[2]);
if (type == "bind") {
void* ctx = zmq_ctx_new();
// We'll monitor this socket
std::cout << "[" << elapsed() << "] [main] Create..." << std::endl;
void* pull = zmq_socket(ctx, ZMQ_PULL);
zmq_socket_monitor(pull, "inproc://monitor", ZMQ_EVENT_ALL);
// Create a socket for collecting monitor events
void* monitor_sock = zmq_socket(ctx, ZMQ_PAIR);
zmq_connect(monitor_sock, "inproc://monitor");
// Start a monitor thread
std::atomic_flag flag = ATOMIC_FLAG_INIT;
flag.test_and_set();
std::thread monitor_thread([monitor_sock, &flag]() {
while (flag.test_and_set()) print_monitor_event(monitor_sock);
});
// Sleep 1s
std::this_thread::sleep_for(std::chrono::seconds(1));
// Bind
std::cout << "[" << elapsed() << "] [main] Bind..." << std::endl;
zmq_bind(pull, address.c_str());
// Sleep 6s
std::this_thread::sleep_for(std::chrono::seconds(6));
// Unbind
std::cout << "[" << elapsed() << "] [main] Unbind..." << std::endl;
zmq_unbind(pull, address.c_str());
// Sleep 1s
std::this_thread::sleep_for(std::chrono::seconds(1));
// Close
std::cout << "[" << elapsed() << "] [main] Close..." << std::endl;
zmq_close(pull);
// Sleep 1s
std::this_thread::sleep_for(std::chrono::seconds(1));
// Finish
std::cout << "[" << elapsed() << "] [main] Finish..." << std::endl;
flag.clear();
monitor_thread.join();
zmq_close(monitor_sock);
zmq_ctx_term(ctx);
return 0;
} else if (type == "connect") {
void* ctx = zmq_ctx_new();
// We'll monitor this socket
std::cout << "[" << elapsed() << "] [main] Create..." << std::endl;
void* push = zmq_socket(ctx, ZMQ_PUSH);
zmq_socket_monitor(push, "inproc://monitor", ZMQ_EVENT_ALL);
// Create a socket for collecting monitor events
void* monitor_sock = zmq_socket(ctx, ZMQ_PAIR);
zmq_connect(monitor_sock, "inproc://monitor");
// Start a monitor thread
std::atomic_flag flag = ATOMIC_FLAG_INIT;
flag.test_and_set();
std::thread monitor_thread([monitor_sock, &flag]() {
while (flag.test_and_set()) print_monitor_event(monitor_sock);
});
// Sleep 2s
std::this_thread::sleep_for(std::chrono::seconds(2));
// Connect
std::cout << "[" << elapsed() << "] [main] Connect..." << std::endl;
zmq_connect(push, address.c_str());
// Sleep 1s
std::this_thread::sleep_for(std::chrono::seconds(1));
// Disconnect
std::cout << "[" << elapsed() << "] [main] Disconnect..." << std::endl;
zmq_disconnect(push, address.c_str());
// Sleep 1s
std::this_thread::sleep_for(std::chrono::seconds(1));
// Close
std::cout << "[" << elapsed() << "] [main] Close..." << std::endl;
zmq_close(push);
// Sleep 1s
std::this_thread::sleep_for(std::chrono::seconds(1));
// Stop monitoring
flag.clear();
monitor_thread.join();
zmq_close(monitor_sock);
// Create new socket and restart monitor thread
std::cout << "[" << elapsed() << "] [main] Create..." << std::endl;
push = zmq_socket(ctx, ZMQ_PUSH);
zmq_socket_monitor(push, "inproc://monitor", ZMQ_EVENT_ALL);
monitor_sock = zmq_socket(ctx, ZMQ_PAIR);
zmq_connect(monitor_sock, "inproc://monitor");
std::thread monitor_thread2([monitor_sock, &flag]() {
while (flag.test_and_set()) print_monitor_event(monitor_sock);
});
// Sleep 1s
std::this_thread::sleep_for(std::chrono::seconds(1));
// Connect
std::cout << "[" << elapsed() << "] [main] Connect..." << std::endl;
zmq_connect(push, address.c_str());
// Sleep 3s
std::this_thread::sleep_for(std::chrono::seconds(3));
// Close
std::cout << "[" << elapsed() << "] [main] Close..." << std::endl;
zmq_close(push);
// Finish
std::cout << "[" << elapsed() << "] [main] Finish..." << std::endl;
flag.clear();
monitor_thread2.join();
zmq_close(monitor_sock);
zmq_ctx_term(ctx);
return 0;
} else {
std::cerr << "Invalid type: " << type << std::endl;
return 1;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment