Skip to content

Instantly share code, notes, and snippets.

@melo
Created August 21, 2012 16:11
Show Gist options
  • Save melo/3416913 to your computer and use it in GitHub Desktop.
Save melo/3416913 to your computer and use it in GitHub Desktop.
Strange problem with ZeroMQ perl bindings + fork
//
// Demonstrate that zmq_init followed by fork does not die
//
// Compile/run:
//
// cc -lzmq -o test test.c ; ./test
//
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
int main (void)
{
void *context = zmq_init (1);
pid_t pid = fork();
if (pid) {
void *responder = zmq_socket (context, ZMQ_REP);
// Socket to talk to clients
zmq_bind (responder, "ipc://xpto.sock");
while (1) {
// Wait for next request from client
zmq_msg_t request;
zmq_msg_init (&request);
zmq_recv (responder, &request, 0);
char payload[10];
int size = zmq_msg_size(&request);
memcpy(payload, zmq_msg_data(&request), size);
payload[size] = '\0';
if (strncmp(payload, "quit", 4) == 0) {
printf("[%d] Received '%s', sleep a bit\n", getpid(), payload);
sleep(4);
break;
}
printf ("[%d] Received '%s'\n", getpid(), payload);
zmq_msg_close (&request);
// Do some 'work'
sleep (1);
// Send reply back to client
zmq_msg_t reply;
zmq_msg_init_size (&reply, 5);
memcpy (zmq_msg_data (&reply), "World", 5);
zmq_send (responder, &reply, 0);
zmq_msg_close (&reply);
}
// We never get here but if we did, this would be how we end
printf("[%d] Parent is cleaning up\n", getpid());
zmq_close (responder);
zmq_term (context);
printf("[%d] waiting for child\n", getpid());
int status;
int rpid = waitpid(pid, &status, 0);
printf("[%d] waited for pid %d, got %d status %d\n", getpid(), pid, rpid, status);
printf("[%d] Parent is exiting\n", getpid());
}
else {
context = zmq_init (1);
// Socket to talk to server
printf ("[%d] Connecting to hello world server...\n", getpid());
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "ipc://xpto.sock");
int request_nbr;
zmq_msg_t request;
for (request_nbr = 0; request_nbr != 2; request_nbr++) {
zmq_msg_init_size (&request, 5);
memcpy (zmq_msg_data (&request), "Hello", 5);
printf ("[%d] Sending Hello %d...\n", getpid(), request_nbr);
zmq_send (requester, &request, 0);
zmq_msg_close (&request);
zmq_msg_t reply;
zmq_msg_init (&reply);
zmq_recv (requester, &reply, 0);
printf ("[%d] Received World %d\n", getpid(), request_nbr);
zmq_msg_close (&reply);
}
zmq_msg_init_size (&request, 4);
memcpy (zmq_msg_data (&request), "quit", 4);
printf ("[%d] Sending quit...\n", getpid());
zmq_send (requester, &request, 0);
zmq_msg_close (&request);
sleep(1);
printf("[%d] Child is cleaning up\n", getpid());
zmq_close (requester);
zmq_term (context);
printf("[%d] Child is exiting()\n", getpid());
exit(0);
}
return 0;
}
#!/usr/bin/env perl
#
# Move the first zmq_init to before the fork to trigger the assertion
#
use strict;
use warnings;
use ZeroMQ::Raw;
use ZeroMQ::Constants ':all';
my $pid = fork();
my $ctx = ZeroMQ::Raw::zmq_init(1); ## Move this line after the fork, and no assertion
if ($pid) { ## parent
my $sub = ZeroMQ::Raw::zmq_socket($ctx, ZMQ_SUB);
ZeroMQ::Raw::zmq_setsockopt($sub, ZMQ_SUBSCRIBE, '');
ZeroMQ::Raw::zmq_bind($sub, 'ipc://x.sock');
while (1) {
my $msg = ZeroMQ::Raw::zmq_recv($sub, ZMQ_NOBLOCK);
if ($msg) {
my $t = ZeroMQ::Raw::zmq_msg_data($msg);
l("Topic '$t'");
$msg = ZeroMQ::Raw::zmq_recv($sub, 0);
my $p = ZeroMQ::Raw::zmq_msg_data($msg);
l("Payload '$p'");
last if $p eq 'quit';
}
l("## No message, sleep a bit");
sleep(2);
}
l("And we are done...");
ZeroMQ::Raw::zmq_close($sub);
ZeroMQ::Raw::zmq_term($ctx);
waitpid($pid, 0);
END { unlink('x.sock') }
}
else {
l('Wait a bit to start publisher...');
sleep(1);
my $ctx = ZeroMQ::Raw::zmq_init(1);
my $pub = ZeroMQ::Raw::zmq_socket($ctx, ZMQ_PUB);
ZeroMQ::Raw::zmq_connect($pub, 'ipc://x.sock');
ZeroMQ::Raw::zmq_send($pub, 't1', ZMQ_SNDMORE);
ZeroMQ::Raw::zmq_send($pub, '1', 0);
sleep(1);
ZeroMQ::Raw::zmq_send($pub, 't4', ZMQ_SNDMORE);
ZeroMQ::Raw::zmq_send($pub, '2 one thousand', 0);
sleep(1);
ZeroMQ::Raw::zmq_send($pub, 't3', ZMQ_SNDMORE);
ZeroMQ::Raw::zmq_send($pub, '3 is done', 0);
sleep(1);
ZeroMQ::Raw::zmq_send($pub, 't2', ZMQ_SNDMORE);
ZeroMQ::Raw::zmq_send($pub, 'quit', 0);
sleep(1);
l("Publisher is done...");
ZeroMQ::Raw::zmq_close($pub);
ZeroMQ::Raw::zmq_term($ctx);
l("Publisher exits, stage left");
}
#sub l {}
sub l { print STDERR "## [$$] ", @_, "\n" }
#!/usr/bin/env perl
use strict;
use warnings;
use ZMQ::LibZMQ2;
use ZMQ::Constants ':v2.1.11', ':all';
my $pid = fork();
my $ctx = zmq_init(1); ## Move this line after the fork, and no assertion
if ($pid) { ## parent
my $sub = zmq_socket($ctx, ZMQ_SUB);
zmq_setsockopt($sub, ZMQ_SUBSCRIBE, '');
zmq_bind($sub, 'ipc://x.sock');
while (1) {
l("## Checking for messages");
my $msg = zmq_recv($sub, ZMQ_NOBLOCK);
if ($msg) {
my $t = zmq_msg_data($msg);
l("Topic '$t'");
$msg = zmq_recv($sub, 0);
my $p = zmq_msg_data($msg);
l("Payload '$p'");
last if $p eq 'quit';
}
l("## No message, sleep a bit");
sleep(3);
}
l("And we are done...");
zmq_close($sub);
zmq_term($ctx);
waitpid($pid, 0);
END { unlink('x.sock') }
}
else {
l('Wait a bit to start publisher...');
sleep(1);
my $ctx = zmq_init(1);
my $pub = zmq_socket($ctx, ZMQ_PUB);
zmq_connect($pub, 'ipc://x.sock');
zmq_send($pub, 't1', ZMQ_SNDMORE);
zmq_send($pub, '1', 0);
sleep(1);
zmq_send($pub, 't4', ZMQ_SNDMORE);
zmq_send($pub, '2 one thousand', 0);
sleep(1);
zmq_send($pub, 't3', ZMQ_SNDMORE);
zmq_send($pub, '3 is done', 0);
sleep(1);
zmq_send($pub, 't2', ZMQ_SNDMORE);
zmq_send($pub, 'quit', 0);
sleep(1);
l("Publisher is done...");
zmq_close($pub);
zmq_term($ctx);
l("Publisher exits, stage left");
}
#sub l {}
sub l { print STDERR "## [$$] ", @_, "\n" }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment