Skip to content

Instantly share code, notes, and snippets.

@mschubert
Created May 14, 2023 08:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mschubert/fb6191e8ef273db935cde3928fc13232 to your computer and use it in GitHub Desktop.
Save mschubert/fb6191e8ef273db935cde3928fc13232 to your computer and use it in GitHub Desktop.
ZeroMQ test ZMQ_ROUTER_NOTIFY draft option
// gcc -DZMQ_BUILD_DRAFT_API=ON -lzmq -o test_monitor test_monitor.c
// ./test_monitor
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zmq.h>
void recv_print(void *frontend) {
zmq_msg_t message;
while (1) {
zmq_msg_init(&message);
zmq_msg_recv(&message, frontend, 0);
char *content = strndup((char*)zmq_msg_data(&message), zmq_msg_size(&message));
printf("Frame: (%i) %s\n", zmq_msg_size(&message), content);
int more = zmq_msg_more(&message);
zmq_msg_close(&message);
if (!more) break;
}
printf("\n");
}
void direct() {
char *sid = "client";
int opt = ZMQ_NOTIFY_CONNECT;
void *context = zmq_ctx_new();
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_REQ);
zmq_setsockopt(backend, ZMQ_ROUTING_ID, sid, strlen(sid));
zmq_setsockopt(frontend, ZMQ_ROUTER_NOTIFY, &opt, sizeof(opt));
zmq_bind(frontend, "tcp://*:5555");
zmq_connect(backend, "tcp://localhost:5555");
char *greet = "hello";
zmq_send(backend, strdup(greet), strlen(greet), 0);
recv_print(frontend);
recv_print(frontend);
}
void fwd(void *fwd_in, void *fwd_out) {
zmq_msg_t message;
while (1) {
zmq_msg_init(&message);
zmq_msg_recv(&message, fwd_in, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, fwd_out, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if (!more) break;
}
}
void proxied() {
char *sid = "client";
char *pid = "proxy";
int opt = ZMQ_NOTIFY_CONNECT;
void *context = zmq_ctx_new();
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_REQ);
void *fwd_in = zmq_socket(context, ZMQ_ROUTER);
void *fwd_out = zmq_socket(context, ZMQ_DEALER);
zmq_setsockopt(backend, ZMQ_ROUTING_ID, sid, strlen(sid)+1);
zmq_setsockopt(fwd_out, ZMQ_ROUTING_ID, pid, strlen(pid)+1);
zmq_setsockopt(fwd_in, ZMQ_ROUTER_NOTIFY, &opt, sizeof(opt));
zmq_setsockopt(frontend, ZMQ_ROUTER_NOTIFY, &opt, sizeof(opt));
zmq_bind(frontend, "tcp://*:9955");
zmq_bind(fwd_in, "tcp://*:9988");
zmq_connect(fwd_out, "tcp://localhost:9955");
zmq_connect(backend, "tcp://localhost:9988");
char *greet = "hello";
zmq_send(backend, strdup(greet), strlen(greet)+1, 0);
recv_print(frontend);
fwd(fwd_in, fwd_out);
recv_print(frontend);
fwd(fwd_in, fwd_out);
recv_print(frontend);
}
int main() {
printf("* DIRECT *\n");
direct();
printf("* PROXIED *\n");
proxied();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment