Skip to content

Instantly share code, notes, and snippets.

@codebullies
Last active August 11, 2021 03:36
Show Gist options
  • Save codebullies/4527565 to your computer and use it in GitHub Desktop.
Save codebullies/4527565 to your computer and use it in GitHub Desktop.
An example of CZMQ Multicast using PUB/SUB

To use the scheme epgm:// or pgm:// you must configure libzmq 3.x (HEAD) using the --with-pgm.

./configure --with-pgm
./make
./make install

Remember to set you LD_LIBRARY_PATH to the location of the libzmq installation parameter For bash:

export LD_LIBRARY_PATH=/usr/local/lib

To compile publisher.c and subscriber.c

gcc -lzmq -lczmq -o publisher publisher.c
gcc -lzmq -lczmq -o subscriber subscriber.c
#include <stdio.h>
#include <czmq.h>
int main()
{
// create the zeromq context
void *ctx = zctx_new();
void *pub = zsocket_new(ctx, ZMQ_PUB);
zsocket_connect(pub, "epgm://192.168.10.114;239.192.1.1:5000");
printf("multicast initialized\n");
while(!zctx_interrupted) {
zstr_sendm (pub, "testing", ZMQ_SNDMORE);
zstr_send(pub, "This is a test");
}
return 0;
}
#include <stdio.h>
#include <czmq.h>
int main()
{
// create the zeromq context
void *ctx = zctx_new();
void *sub = zsocket_new(ctx, ZMQ_SUB);
zsocket_connect(sub, "epgm://192.168.10.114;239.192.1.1:5000");
zsocket_set_subscribe(sub, "testing");
printf("multicast initialized\n");
while(!zctx_interrupted) {
zmsg_t *msg = zmsg_new();
msg = zmsg_recv (sub);
printf("message size = %d\n" + zmsg_size(msg));
int i;
for (i = 0; i < zmsg_size(msg); i++) {
printf("Data frame %d is %s\n", i, zframe_data(zmsg_next(msg)));
}
zmsg_destroy (&msg);
}
return 0;
}
@porjo
Copy link

porjo commented Jul 10, 2013

Thanks for the examples. A couple of suggestions for improvement:

  • insert a sleep statement in the publisher while loop to avoid CPU getting hogged and network getting flooded e.g. zclock_sleep (1000);
  • In the subscriber, I found I was getting glitchy output when passing the byte* to printf. I got more consistent output by creating char* using zframe_strdup and passing that to printf e.g.
while(!zctx_interrupted) {
  zframe_t *identity = zmsg_pop (msg);
  zframe_t *content = zmsg_pop (msg);
  char * identity_str = zframe_strdup (identity);
  char * content_str = zframe_strdup (content);

  printf("identify: %s\n", identity_str);
  printf("content: %s\n", content_str);

  free(identity_str);
  free(content_str);

  zmsg_destroy (&msg);
 }

@ggitau
Copy link

ggitau commented Oct 16, 2014

This uses PLAIN mechanism. Could you do an example that uses CURVE??

@Panchatcharam
Copy link

@ggitau, you can refer to this location for curve implementation but that might require some improvements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment