zeromq socket monitor test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <zmq.h> | |
#include <atomic> | |
#include <chrono> | |
#include <iomanip> | |
#include <iostream> | |
#include <string> | |
#include <thread> | |
double elapsed() { | |
static auto begin = std::chrono::system_clock::now(); | |
auto end = std::chrono::system_clock::now(); | |
auto t = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin); | |
return static_cast<double>(t.count()) / 1000; | |
} | |
void print_monitor_event(void* monitor) { | |
// First frame in message contains event number and value | |
zmq_msg_t msg; | |
zmq_msg_init(&msg); | |
if (zmq_msg_recv(&msg, monitor, ZMQ_DONTWAIT) == -1 && zmq_msg_more(&msg)) { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
return; | |
} | |
char* data = (char*)(zmq_msg_data(&msg)); | |
uint16_t event = *(uint16_t*)(data); | |
uint32_t value = *(uint32_t*)(data + 2); | |
// Second frame in message contains event address | |
zmq_msg_init(&msg); | |
if (zmq_msg_recv(&msg, monitor, ZMQ_DONTWAIT) == -1) { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
return; | |
} | |
data = (char*)(zmq_msg_data(&msg)); | |
size_t size = zmq_msg_size(&msg); | |
std::string address(data, size); | |
std::cout << "[" << elapsed() << "] [event] number: " << event | |
<< ", value: " << value << ", address: " << address << std::endl; | |
} | |
int main(int argc, char** argv) { | |
if (argc != 3) { | |
std::cout << "Usage: " << argv[0] << " [type] [address]" << std::endl; | |
return 1; | |
} | |
std::cout << std::fixed << std::setprecision(3); | |
std::string type(argv[1]); | |
std::string address(argv[2]); | |
if (type == "bind") { | |
void* ctx = zmq_ctx_new(); | |
// We'll monitor this socket | |
std::cout << "[" << elapsed() << "] [main] Create..." << std::endl; | |
void* pull = zmq_socket(ctx, ZMQ_PULL); | |
zmq_socket_monitor(pull, "inproc://monitor", ZMQ_EVENT_ALL); | |
// Create a socket for collecting monitor events | |
void* monitor_sock = zmq_socket(ctx, ZMQ_PAIR); | |
zmq_connect(monitor_sock, "inproc://monitor"); | |
// Start a monitor thread | |
std::atomic_flag flag = ATOMIC_FLAG_INIT; | |
flag.test_and_set(); | |
std::thread monitor_thread([monitor_sock, &flag]() { | |
while (flag.test_and_set()) print_monitor_event(monitor_sock); | |
}); | |
// Sleep 1s | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// Bind | |
std::cout << "[" << elapsed() << "] [main] Bind..." << std::endl; | |
zmq_bind(pull, address.c_str()); | |
// Sleep 6s | |
std::this_thread::sleep_for(std::chrono::seconds(6)); | |
// Unbind | |
std::cout << "[" << elapsed() << "] [main] Unbind..." << std::endl; | |
zmq_unbind(pull, address.c_str()); | |
// Sleep 1s | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// Close | |
std::cout << "[" << elapsed() << "] [main] Close..." << std::endl; | |
zmq_close(pull); | |
// Sleep 1s | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// Finish | |
std::cout << "[" << elapsed() << "] [main] Finish..." << std::endl; | |
flag.clear(); | |
monitor_thread.join(); | |
zmq_close(monitor_sock); | |
zmq_ctx_term(ctx); | |
return 0; | |
} else if (type == "connect") { | |
void* ctx = zmq_ctx_new(); | |
// We'll monitor this socket | |
std::cout << "[" << elapsed() << "] [main] Create..." << std::endl; | |
void* push = zmq_socket(ctx, ZMQ_PUSH); | |
zmq_socket_monitor(push, "inproc://monitor", ZMQ_EVENT_ALL); | |
// Create a socket for collecting monitor events | |
void* monitor_sock = zmq_socket(ctx, ZMQ_PAIR); | |
zmq_connect(monitor_sock, "inproc://monitor"); | |
// Start a monitor thread | |
std::atomic_flag flag = ATOMIC_FLAG_INIT; | |
flag.test_and_set(); | |
std::thread monitor_thread([monitor_sock, &flag]() { | |
while (flag.test_and_set()) print_monitor_event(monitor_sock); | |
}); | |
// Sleep 2s | |
std::this_thread::sleep_for(std::chrono::seconds(2)); | |
// Connect | |
std::cout << "[" << elapsed() << "] [main] Connect..." << std::endl; | |
zmq_connect(push, address.c_str()); | |
// Sleep 1s | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// Disconnect | |
std::cout << "[" << elapsed() << "] [main] Disconnect..." << std::endl; | |
zmq_disconnect(push, address.c_str()); | |
// Sleep 1s | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// Close | |
std::cout << "[" << elapsed() << "] [main] Close..." << std::endl; | |
zmq_close(push); | |
// Sleep 1s | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// Stop monitoring | |
flag.clear(); | |
monitor_thread.join(); | |
zmq_close(monitor_sock); | |
// Create new socket and restart monitor thread | |
std::cout << "[" << elapsed() << "] [main] Create..." << std::endl; | |
push = zmq_socket(ctx, ZMQ_PUSH); | |
zmq_socket_monitor(push, "inproc://monitor", ZMQ_EVENT_ALL); | |
monitor_sock = zmq_socket(ctx, ZMQ_PAIR); | |
zmq_connect(monitor_sock, "inproc://monitor"); | |
std::thread monitor_thread2([monitor_sock, &flag]() { | |
while (flag.test_and_set()) print_monitor_event(monitor_sock); | |
}); | |
// Sleep 1s | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// Connect | |
std::cout << "[" << elapsed() << "] [main] Connect..." << std::endl; | |
zmq_connect(push, address.c_str()); | |
// Sleep 3s | |
std::this_thread::sleep_for(std::chrono::seconds(3)); | |
// Close | |
std::cout << "[" << elapsed() << "] [main] Close..." << std::endl; | |
zmq_close(push); | |
// Finish | |
std::cout << "[" << elapsed() << "] [main] Finish..." << std::endl; | |
flag.clear(); | |
monitor_thread2.join(); | |
zmq_close(monitor_sock); | |
zmq_ctx_term(ctx); | |
return 0; | |
} else { | |
std::cerr << "Invalid type: " << type << std::endl; | |
return 1; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment