Skip to content

Instantly share code, notes, and snippets.

@deepcube
Created May 24, 2013 16:49
Show Gist options
  • Save deepcube/5644856 to your computer and use it in GitHub Desktop.
Save deepcube/5644856 to your computer and use it in GitHub Desktop.
zeromq broker. Frontend client prepends messages with the ID of the final destination. Broker receives message through a ZMQ_ROUTER so message now has initial source ID prepended. Broker swaps first two frames and sends message out backend ZMQ_ROUTER, which routes message to final destination. Clients used as destinations must identify to broker…
#include <czmq.h>
#include <stdlib.h>
#include <zmq.h>
#include "zerr.h"
/*
* read message, switch first two frames (from and to), send it back out
* arg is the socket to which the message should be sent. For the front_end arg
* is back_end, and vice versa.
*
* NOTE: If a zmsg_push() succeeds, the frame is owned by the message, and will
* be destroyed along with the message on zmsg_send().
* If a zmsg_push() fails, we still own the frame and must destroy it
* I asked on the zmq mailing list about this, and will submit a patch
* so these calls nullify the reference.
* TODO: Log errors to syslog, log EHOSTUNREACH errors once per unreachable
* host until reconnect.
* Find out if any of these errors should be fatal.
*/
int msg_handler(zloop_t *loop, zmq_pollitem_t *item, void *arg)
{
zmsg_t *msg = NULL;
zframe_t *src = NULL;
zframe_t *dst = NULL;
void *out = arg;
zcheck_goto(cleanup, msg = zmsg_recv(item->socket), "zmsg_recv failed" );
zcheck_goto(cleanup, 2 <= zmsg_size( msg) , "message too small (need 2 address frames)" );
zcheck_goto(cleanup, src = zmsg_pop ( msg) , "zmsg_pop failed to pop source address" );
zcheck_goto(cleanup, dst = zmsg_pop ( msg) , "zmsg_pop failed to pop destination address" );
zcheck_goto(cleanup, 0 == zmsg_push( msg, src) , "zmsg_push failed to push source address" );
src = NULL;
zcheck_goto(cleanup, 0 == zmsg_push( msg, dst) , "zmsg_push failed to push destination address");
dst = NULL;
zcheck_goto(cleanup, 0 == zmsg_send(&msg, out) , "zmsg_send failed" );
return (0);
cleanup:
if (src) { zframe_print(src, "source addr:"); zframe_destroy(&src); }
if (dst) { zframe_print(dst, "dest addr:"); zframe_destroy(&dst); }
if (msg) { zmsg_dump(msg) ; zmsg_destroy (&msg); }
return (0); // returning -1 causes the zloop to exit. we want to complain, but continue running
}
int main(int argc, char **argv)
{
zctx_t *ctx = NULL;
void *front_end = NULL;
void *back_end = NULL;
zloop_t *loop = NULL;
zmq_pollitem_t front_poll = {};
zmq_pollitem_t back_poll = {};
int status = EXIT_FAILURE;
zcheck_err(EXIT_FAILURE, argc == 4, "Usage: %s <protocol://front_address> <protocol://back_address> <broker_id>", argv[0]);
zcheck_goto(cleanup, ctx = zctx_new() , "zctx_new failed to create zeromq context" );
zcheck_goto(cleanup, front_end = zsocket_new(ctx, ZMQ_ROUTER), "zsocket_new failed to create front_end socket");
zcheck_goto(cleanup, back_end = zsocket_new(ctx, ZMQ_ROUTER), "zsocket_new failed to create back_end socket" );
zcheck_goto(cleanup, loop = zloop_new() , "zloop_new failed" );
zsocket_set_router_mandatory(front_end, 1);
zsocket_set_router_mandatory(back_end , 1);
zsocket_set_identity(front_end, argv[3]);
zsocket_set_identity(back_end , argv[3]);
front_poll = (zmq_pollitem_t){ .socket = front_end, .fd = 0, .events = ZMQ_POLLIN, .revents = 0 };
back_poll = (zmq_pollitem_t){ .socket = back_end , .fd = 0, .events = ZMQ_POLLIN, .revents = 0 };
//TODO: use zeromq's perf tools to test IPC vs TCP for back_end
zcheck_goto(cleanup, 0 <= zsocket_bind(front_end, argv[1]) , "zsocket_bind failed to bind front_end socket" );
zcheck_goto(cleanup, 0 <= zsocket_bind(back_end , argv[2]) , "zsocket_bind failed to bind back_end socket" );
zcheck_goto(cleanup, 0 == zloop_poller(loop, &front_poll, msg_handler, back_end ), "zloop_poller failed to register front_end handler");
zcheck_goto(cleanup, 0 == zloop_poller(loop, &back_poll , msg_handler, front_end), "zloop_poller failed to register back_end handler" );
status = (zloop_start(loop) ? EXIT_FAILURE : EXIT_SUCCESS);
cleanup:
if (loop) zloop_destroy(&loop);
if (ctx ) zctx_destroy(&ctx);
return (status);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment