Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@indygreg
Created November 28, 2010 02:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save indygreg/718526 to your computer and use it in GitHub Desktop.
Save indygreg/718526 to your computer and use it in GitHub Desktop.
ZeroMQ Blocking recv() Bug Reproduce
// 0MQ "data ready" + recv() block bug by Gregory Szorc <gregory.szorc@gmail.com>
//
// This file produces a bug where a 0MQ socket says it has a message
// available but the call to recv() blocks. This should not happen.
//
// Reproduce conditions:
//
// Definitely on Linux. Not on Windows
//
// For me, this reproduces on x86_64 Linux most of the time, but not always.
// I'm not sure if the threading matters. I reproduced the bug from a
// program with threading, so I included the threading just because.
//
// Gist:
//
// 1) Create a PULL socket and bind via inproc://
// 2) Create a PUSH socket connect() to previous PULL socket
// 3) Send a message on PUSH
// 4) Create a new thread and connect() an inproc:// SUB socket that hasn't
// been bound on the other end. This will throw an error. We do this in
// a separate thread, but I'm not sure if that matters.
// 5) Create a new thread and connect() to the main thread's PULL socket and
// start writing messages.
// 6) In a poll loop on the main thread, the PULL socket is marked as
// ready for reading, but recv() blocks.
//
// Instructions:
//
// gcc -I/path/to/zmq/include -lzmq -o zmqbug zmq_bug.cpp
//
// You should see some debugging output as the program runs. Eventually, it
// will freeze at "attempting to recv() from pair_pull". At this point,
// the bug has manifested. For me, if it hangs, it will hang in the first 3
// seconds. If it runs longer than that, no repro. The test can likely
// be reduced to something simpler and more reliable.
//
// The stack on my machine at this point looks like:
//
// #0 0x00007ffff76ec6cc in __libc_recv (fd=15, buf=<value optimized out>, n=<value optimized out>, flags=<value optimized out>) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:34
// #1 0x00007ffff7bc5489 in recv (this=<value optimized out>, cmd_=0x7fffffffe3a0, block_=<value optimized out>) at /usr/include/bits/socket2.h:45
// #2 zmq::signaler_t::recv (this=<value optimized out>, cmd_=0x7fffffffe3a0, block_=<value optimized out>) at signaler.cpp:263
// #3 0x00007ffff7bb6264 in zmq::app_thread_t::process_commands (this=0x606800, block_=<value optimized out>, throttle_=<value optimized out>) at app_thread.cpp:88
// #4 0x00007ffff7bc5aaf in zmq::socket_base_t::recv(struct {...} *, int) (this=0x606850, msg_=0x7fffffffe500, flags_=0) at socket_base.cpp:435
// #5 0x000000000040263b in zmq::socket_t::recv(zmq::message_t*, int) ()
// #6 0x0000000000401bea in main ()
//
// This was built against ZeroMQ 2.0.10 (53d1677c8b0f85e309c6a067b47c80dedcffb5aa)
#include <assert.h>
#include <pthread.h>
#include <zmq.hpp>
#include <iostream>
using std::cout;
using std::endl;
const char * pub_endpoint = "inproc://pubsub";
const char * pair_endpoint = "inproc://pair";
void * sub_connector(void *p)
{
try {
zmq::context_t *ctx = (zmq::context_t *)p;
// connect to the pair socket first and send a message
zmq::socket_t push(*ctx, ZMQ_PUSH);
push.connect(pair_endpoint);
zmq::message_t msg(50);
memcpy(msg.data(), "hello world", sizeof("hello world"));
assert(true == push.send(msg, 0));
zmq::socket_t pub(*ctx, ZMQ_SUB);
pub.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
pub.connect(pub_endpoint);
assert("we should never get here");
}
catch (zmq::error_t e) {
cout << "0MQ exception in sub_connector. this is expected. what: " << e.what() << endl;
return (void *)0;
}
catch ( ... ) {
cout << "Unexpected exception in sub_connector" << endl;
return (void *)1;
}
return (void *)0;
}
void * pair_writer(void *p)
{
try {
zmq::context_t *ctx = (zmq::context_t *)p;
zmq::socket_t push(*ctx, ZMQ_PUSH);
push.connect(pair_endpoint);
int iteration = 0;
timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 1000000;
while (true) {
zmq::message_t m(sizeof(int));
memcpy(m.data(), &iteration, m.size());
assert(true == push.send(m, 0));
iteration++;
nanosleep(&ts, NULL);
}
}
catch (zmq::error_t e) {
cout << "Unexpected 0MQ exception in pair_writer: " << e.what() << endl;
return (void *)1;
}
catch ( ... ) {
cout << "Unexpected exception in pair_writer()" << endl;
return (void *)1;
}
return (void *)0;
}
int main(int argc, const char *argv[])
{
zmq::context_t ctx(3);
zmq::socket_t pair_pull(ctx, ZMQ_PULL);
pair_pull.bind(pair_endpoint);
zmq::socket_t pair_push(ctx, ZMQ_PUSH);
pair_push.connect(pair_endpoint);
{
zmq::message_t msg(50);
assert(true == pair_push.send(msg, 0));
}
// create a thread that writes a message to a pair socket every second
pthread_t t_pair_writer;
assert(0 == pthread_create(&t_pair_writer, NULL, pair_writer, (void *)&ctx));
// create a thread that attempts to connect to a PUB-SUB socket
// that isn't bound yet
pthread_t t_sub_connector;
assert(0 == pthread_create(&t_sub_connector, NULL, sub_connector, (void *)&ctx));
assert(0 == pthread_join(t_sub_connector, NULL));
// now we read messages from the pair socket
zmq::pollitem_t pollitem[1];
pollitem[0].socket = (void *)pair_pull;
pollitem[0].events = ZMQ_POLLIN;
pollitem[0].fd = 0;
while (true) {
cout << "polling for items...";
int result = zmq::poll(&pollitem[0], 1, 1000000);
cout << "done" << endl;
if (result < 1) continue;
cout << "socket ready for reading" << endl;
zmq::message_t msg;
size_t more;
size_t moresz;
while (true) {
cout << "attempting to recv() from pair_pull" << endl;
if (!pair_pull.recv(&msg, 0)) {
cout << "false returned from pair sock recv(). huh?" << endl;
break;
}
cout << "received message on pair sock" << endl;
if (msg.size() == sizeof(int)) {
int i;
memcpy(&i, msg.data(), sizeof(i));
if (i > 100) {
cout << "did not reproduce" << endl;
_exit(1);
}
}
moresz = sizeof(more);
pair_pull.getsockopt(ZMQ_RCVMORE, &more, &moresz);
if (!more) break;
}
}
return 0;
}
@DmitriiGorelov
Copy link

DmitriiGorelov commented Feb 2, 2024

At first, There is a function context.shutdown(), that terminates all blocked actions like socket.recv() or socket.send(). You can call context.shutdown() when handle a signal like ctrl+c or at any place you use to inform the program it has to exit. Second thing you have to do is: to wrap the socket.recv/send loop by the catch block like the asnwer above suggests:

try
{
//Any ZMQ socket sending or receiving
}
catch (const zmq::error_t& e)
{
if (e.num() == ETERM)
{
//Catch a termination error.
break; // or return, or end your loop
}
}
And the last, after the try-catch block, in the end of your recv/send loop, you have to place the following:

if (socket.handle()) socket.close();
context.close();

and than exit the program normally. Worked for me, hope it helps you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment