Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ZeroMQ Blocking recv() Bug Reproduce
// 0MQ "data ready" + recv() block bug by Gregory Szorc <gregory.szorc@gmail.com>
//
// This file produces a bug where a 0MQ socket says it has a message
// available but the call to recv() blocks. This should not happen.
//
// Reproduce conditions:
//
// Definitely on Linux. Not on Windows
//
// For me, this reproduces on x86_64 Linux most of the time, but not always.
// I'm not sure if the threading matters. I reproduced the bug from a
// program with threading, so I included the threading just because.
//
// Gist:
//
// 1) Create a PULL socket and bind via inproc://
// 2) Create a PUSH socket connect() to previous PULL socket
// 3) Send a message on PUSH
// 4) Create a new thread and connect() an inproc:// SUB socket that hasn't
// been bound on the other end. This will throw an error. We do this in
// a separate thread, but I'm not sure if that matters.
// 5) Create a new thread and connect() to the main thread's PULL socket and
// start writing messages.
// 6) In a poll loop on the main thread, the PULL socket is marked as
// ready for reading, but recv() blocks.
//
// Instructions:
//
// gcc -I/path/to/zmq/include -lzmq -o zmqbug zmq_bug.cpp
//
// You should see some debugging output as the program runs. Eventually, it
// will freeze at "attempting to recv() from pair_pull". At this point,
// the bug has manifested. For me, if it hangs, it will hang in the first 3
// seconds. If it runs longer than that, no repro. The test can likely
// be reduced to something simpler and more reliable.
//
// The stack on my machine at this point looks like:
//
// #0 0x00007ffff76ec6cc in __libc_recv (fd=15, buf=<value optimized out>, n=<value optimized out>, flags=<value optimized out>) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:34
// #1 0x00007ffff7bc5489 in recv (this=<value optimized out>, cmd_=0x7fffffffe3a0, block_=<value optimized out>) at /usr/include/bits/socket2.h:45
// #2 zmq::signaler_t::recv (this=<value optimized out>, cmd_=0x7fffffffe3a0, block_=<value optimized out>) at signaler.cpp:263
// #3 0x00007ffff7bb6264 in zmq::app_thread_t::process_commands (this=0x606800, block_=<value optimized out>, throttle_=<value optimized out>) at app_thread.cpp:88
// #4 0x00007ffff7bc5aaf in zmq::socket_base_t::recv(struct {...} *, int) (this=0x606850, msg_=0x7fffffffe500, flags_=0) at socket_base.cpp:435
// #5 0x000000000040263b in zmq::socket_t::recv(zmq::message_t*, int) ()
// #6 0x0000000000401bea in main ()
//
// This was built against ZeroMQ 2.0.10 (53d1677c8b0f85e309c6a067b47c80dedcffb5aa)
#include <assert.h>
#include <pthread.h>
#include <zmq.hpp>
#include <iostream>
using std::cout;
using std::endl;
const char * pub_endpoint = "inproc://pubsub";
const char * pair_endpoint = "inproc://pair";
void * sub_connector(void *p)
{
try {
zmq::context_t *ctx = (zmq::context_t *)p;
// connect to the pair socket first and send a message
zmq::socket_t push(*ctx, ZMQ_PUSH);
push.connect(pair_endpoint);
zmq::message_t msg(50);
memcpy(msg.data(), "hello world", sizeof("hello world"));
assert(true == push.send(msg, 0));
zmq::socket_t pub(*ctx, ZMQ_SUB);
pub.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
pub.connect(pub_endpoint);
assert("we should never get here");
}
catch (zmq::error_t e) {
cout << "0MQ exception in sub_connector. this is expected. what: " << e.what() << endl;
return (void *)0;
}
catch ( ... ) {
cout << "Unexpected exception in sub_connector" << endl;
return (void *)1;
}
return (void *)0;
}
void * pair_writer(void *p)
{
try {
zmq::context_t *ctx = (zmq::context_t *)p;
zmq::socket_t push(*ctx, ZMQ_PUSH);
push.connect(pair_endpoint);
int iteration = 0;
timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 1000000;
while (true) {
zmq::message_t m(sizeof(int));
memcpy(m.data(), &iteration, m.size());
assert(true == push.send(m, 0));
iteration++;
nanosleep(&ts, NULL);
}
}
catch (zmq::error_t e) {
cout << "Unexpected 0MQ exception in pair_writer: " << e.what() << endl;
return (void *)1;
}
catch ( ... ) {
cout << "Unexpected exception in pair_writer()" << endl;
return (void *)1;
}
return (void *)0;
}
int main(int argc, const char *argv[])
{
zmq::context_t ctx(3);
zmq::socket_t pair_pull(ctx, ZMQ_PULL);
pair_pull.bind(pair_endpoint);
zmq::socket_t pair_push(ctx, ZMQ_PUSH);
pair_push.connect(pair_endpoint);
{
zmq::message_t msg(50);
assert(true == pair_push.send(msg, 0));
}
// create a thread that writes a message to a pair socket every second
pthread_t t_pair_writer;
assert(0 == pthread_create(&t_pair_writer, NULL, pair_writer, (void *)&ctx));
// create a thread that attempts to connect to a PUB-SUB socket
// that isn't bound yet
pthread_t t_sub_connector;
assert(0 == pthread_create(&t_sub_connector, NULL, sub_connector, (void *)&ctx));
assert(0 == pthread_join(t_sub_connector, NULL));
// now we read messages from the pair socket
zmq::pollitem_t pollitem[1];
pollitem[0].socket = (void *)pair_pull;
pollitem[0].events = ZMQ_POLLIN;
pollitem[0].fd = 0;
while (true) {
cout << "polling for items...";
int result = zmq::poll(&pollitem[0], 1, 1000000);
cout << "done" << endl;
if (result < 1) continue;
cout << "socket ready for reading" << endl;
zmq::message_t msg;
size_t more;
size_t moresz;
while (true) {
cout << "attempting to recv() from pair_pull" << endl;
if (!pair_pull.recv(&msg, 0)) {
cout << "false returned from pair sock recv(). huh?" << endl;
break;
}
cout << "received message on pair sock" << endl;
if (msg.size() == sizeof(int)) {
int i;
memcpy(&i, msg.data(), sizeof(i));
if (i > 100) {
cout << "did not reproduce" << endl;
_exit(1);
}
}
moresz = sizeof(more);
pair_pull.getsockopt(ZMQ_RCVMORE, &more, &moresz);
if (!more) break;
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment