Skip to content

Instantly share code, notes, and snippets.

@hintjens

hintjens/issue188

Created Mar 31, 2011
Embed
What would you like to do?
pubsub.c
//
// Test multipart pub-sub
// Causes assertion failure in fw.cpp:62
//
#include "zmq.h"
#include <pthread.h>
#include <assert.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
static void *
subscriber_thread (void *context)
{
void *sub = zmq_socket (context, ZMQ_SUB);
zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
int rc = zmq_connect (sub, "inproc://test");
assert (rc == 0);
while (1) {
int64_t more;
size_t more_size = sizeof (more);
zmq_msg_t routing;
zmq_msg_init (&routing);
if (zmq_recv (sub, &routing, 0))
break; // Context terminated
zmq_msg_close (&routing);
zmq_getsockopt (sub, ZMQ_RCVMORE, &more, &more_size);
assert (more);
zmq_msg_t content;
zmq_msg_init (&content);
if (zmq_recv (sub, &content, 0))
break; // Context terminated
zmq_msg_close (&content);
zmq_getsockopt (sub, ZMQ_RCVMORE, &more, &more_size);
assert (!more);
// Need to delay subscriber just enough for pub to close
printf ("."); fflush (stdout);
}
zmq_close (sub);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
void *pub = zmq_socket (context, ZMQ_PUB);
zmq_bind (pub, "inproc://test");
pthread_t thread;
pthread_create (&thread, NULL, subscriber_thread, context);
sleep (1);
int i;
for (i = 0; i < 1000; i++) {
zmq_msg_t routing;
zmq_msg_init_size (&routing, 7);
memcmp (zmq_msg_data (&routing), "ROUTING", 7);
zmq_send (pub, &routing, ZMQ_SNDMORE);
zmq_msg_close (&routing);
zmq_msg_t content;
zmq_msg_init_size (&content, 7);
memcmp (zmq_msg_data (&content), "CONTENT", 7);
zmq_send (pub, &content, 0);
zmq_msg_close (&content);
}
zmq_close (pub);
zmq_term (context);
pthread_join (thread, NULL);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.