Skip to content

Instantly share code, notes, and snippets.

@op
Created December 22, 2011 18:11
Show Gist options
  • Save op/1511258 to your computer and use it in GitHub Desktop.
Save op/1511258 to your computer and use it in GitHub Desktop.
xpub/xsub assertion
#include <assert.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
int main (int argc, char *argv[])
{
const char *msg[] = {"0", "1", "2"};
unsigned char buf[128];
int i, rc;
void *ctx = zmq_init(1);
void *pub = zmq_socket(ctx, ZMQ_PUB);
rc = zmq_bind(pub, "tcp://127.0.0.1:5561");
assert(rc == 0);
void *sub = zmq_socket(ctx, ZMQ_SUB);
rc = zmq_bind(sub, "tcp://127.0.0.1:5560");
assert(rc == 0);
rc = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0);
assert(rc == 0);
zmq_pollitem_t items[1];
items[0].socket = sub;
items[0].events = ZMQ_POLLIN;
unsigned int x = 0;
for (x = 0; ; x++) {
if (x % 100000 == 0) {
printf("send %u\n", x);
}
// Publish messages.
rc = zmq_send(pub, msg[0], 1, ZMQ_SNDMORE);
assert(rc >= 0);
rc = zmq_send(pub, msg[1], 1, ZMQ_SNDMORE);
assert(rc >= 0);
rc = zmq_send(pub, msg[2], 1, 0);
assert(rc >= 0);
// Handle published messages.
while (1) {
rc = zmq_poll(items, 1, 0);
assert(rc >= 0);
if (rc == 0) {
break;
}
if (items[0].revents & ZMQ_POLLIN) {
int more = 1;
size_t more_size = sizeof(more);
for (i = 0; more; i++) {
rc = zmq_recv(sub, buf, sizeof(buf), 0);
assert(rc >= 0);
assert(i < sizeof(msg));
if (memcmp(msg[i], buf, rc) != 0) {
printf("recv %d: %.*s (%d)\n", i, rc, buf, rc);
assert(memcmp(msg[i], buf, rc) == 0);
}
rc = zmq_getsockopt(sub, ZMQ_RCVMORE, &more, &more_size);
assert(rc == 0);
}
assert(i == 3);
}
}
}
}
#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <zmq.h>
int main (int argc, char *argv[])
{
char buf[128];
int rc;
int size;
void *ctx = zmq_init(1);
void *xpub = zmq_socket(ctx, ZMQ_XPUB);
rc = zmq_connect(xpub, "tcp://127.0.0.1:5560");
assert(rc == 0);
void *xsub = zmq_socket(ctx, ZMQ_XSUB);
rc = zmq_connect(xsub, "tcp://127.0.0.1:5561");
assert(rc == 0);
zmq_pollitem_t items[2];
items[0].socket = xpub;
items[0].events = ZMQ_POLLIN;
items[1].socket = xsub;
items[1].events = ZMQ_POLLIN;
unsigned int x;
for (x = 0; ; x++) {
if (x % 100000 == 0) {
printf("recv %u\n", x);
}
rc = zmq_poll(items, 2, -1);
assert (rc >= 0);
// Pass the subscription upstream through the device.
if (items[0].revents & ZMQ_POLLIN) {
printf("Handling subscriptions...\n");
rc = zmq_recv(xpub, buf, sizeof(buf), 0);
assert(rc >= 0);
size = rc;
rc = zmq_send(xsub, buf, size, 0);
assert(rc >= 0);
}
// Pass the published messages through the device.
if (items[1].revents & ZMQ_POLLIN) {
int i = 0;
int more;
size_t more_size = sizeof(more);
do {
rc = zmq_recv(xsub, buf, sizeof(buf), 0);
assert(rc >= 0);
size = rc;
rc = zmq_getsockopt(xsub, ZMQ_RCVMORE, &more, &more_size);
assert(rc == 0);
rc = zmq_send(xpub, buf, size, more ? ZMQ_SNDMORE : 0);
assert(rc >= 0);
i++;
} while (more);
if (i != 3) {
assert(i > 0);
printf("recv %d: %.*s (%d)\n", i, rc, buf, rc);
assert(i == 3);
}
}
}
}
@op
Copy link
Author

op commented Dec 22, 2011

If you run one pubsub and one xpubsub, everything is fine. Two or more xpubsub and frames will get lost.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment