Skip to content

Instantly share code, notes, and snippets.

@hintjens
Created September 20, 2013 06:41
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hintjens/6634071 to your computer and use it in GitHub Desktop.
Save hintjens/6634071 to your computer and use it in GitHub Desktop.
Multistage blackbox test
#include <czmq.h>
static void *
s_source (void *args)
{
zctx_t *ctx = zctx_new ();
void *pub = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (pub, "tcp://127.0.0.1:9000");
while (true) {
zstr_send (pub, "%02x", randof (0x100));
zclock_sleep (100);
}
zctx_destroy (&ctx);
return NULL;
}
static void *
s_filter (void *args)
{
zctx_t *ctx = zctx_new ();
void *push = zsocket_new (ctx, ZMQ_PUSH);
zsocket_bind (push, "tcp://127.0.0.1:9001");
void *sub = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (sub, "tcp://localhost:9000");
zsocket_set_subscribe (sub, "0");
zsocket_set_subscribe (sub, "7");
while (true) {
zmsg_t *msg = zmsg_recv (sub);
zmsg_send (&msg, push);
}
zctx_destroy (&ctx);
return NULL;
}
static void *
s_sink (void *args)
{
zctx_t *ctx = zctx_new ();
void *pull = zsocket_new (ctx, ZMQ_PULL);
zsocket_connect (pull, "tcp://localhost:9001");
while (true) {
char *message = zstr_recv (pull);
puts (message);
free (message);
}
zctx_destroy (&ctx);
return NULL;
}
int main (void)
{
zthread_new (s_source, NULL);
zthread_new (s_filter, NULL);
zthread_new (s_sink, NULL);
sleep (100);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment