Created
August 21, 2012 16:11
-
-
Save melo/3416913 to your computer and use it in GitHub Desktop.
Strange problem with ZeroMQ perl bindings + fork
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Demonstrate that zmq_init followed by fork does not die | |
// | |
// Compile/run: | |
// | |
// cc -lzmq -o test test.c ; ./test | |
// | |
#include <zmq.h> | |
#include <stdio.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <stdlib.h> | |
int main (void) | |
{ | |
void *context = zmq_init (1); | |
pid_t pid = fork(); | |
if (pid) { | |
void *responder = zmq_socket (context, ZMQ_REP); | |
// Socket to talk to clients | |
zmq_bind (responder, "ipc://xpto.sock"); | |
while (1) { | |
// Wait for next request from client | |
zmq_msg_t request; | |
zmq_msg_init (&request); | |
zmq_recv (responder, &request, 0); | |
char payload[10]; | |
int size = zmq_msg_size(&request); | |
memcpy(payload, zmq_msg_data(&request), size); | |
payload[size] = '\0'; | |
if (strncmp(payload, "quit", 4) == 0) { | |
printf("[%d] Received '%s', sleep a bit\n", getpid(), payload); | |
sleep(4); | |
break; | |
} | |
printf ("[%d] Received '%s'\n", getpid(), payload); | |
zmq_msg_close (&request); | |
// Do some 'work' | |
sleep (1); | |
// Send reply back to client | |
zmq_msg_t reply; | |
zmq_msg_init_size (&reply, 5); | |
memcpy (zmq_msg_data (&reply), "World", 5); | |
zmq_send (responder, &reply, 0); | |
zmq_msg_close (&reply); | |
} | |
// We never get here but if we did, this would be how we end | |
printf("[%d] Parent is cleaning up\n", getpid()); | |
zmq_close (responder); | |
zmq_term (context); | |
printf("[%d] waiting for child\n", getpid()); | |
int status; | |
int rpid = waitpid(pid, &status, 0); | |
printf("[%d] waited for pid %d, got %d status %d\n", getpid(), pid, rpid, status); | |
printf("[%d] Parent is exiting\n", getpid()); | |
} | |
else { | |
context = zmq_init (1); | |
// Socket to talk to server | |
printf ("[%d] Connecting to hello world server...\n", getpid()); | |
void *requester = zmq_socket (context, ZMQ_REQ); | |
zmq_connect (requester, "ipc://xpto.sock"); | |
int request_nbr; | |
zmq_msg_t request; | |
for (request_nbr = 0; request_nbr != 2; request_nbr++) { | |
zmq_msg_init_size (&request, 5); | |
memcpy (zmq_msg_data (&request), "Hello", 5); | |
printf ("[%d] Sending Hello %d...\n", getpid(), request_nbr); | |
zmq_send (requester, &request, 0); | |
zmq_msg_close (&request); | |
zmq_msg_t reply; | |
zmq_msg_init (&reply); | |
zmq_recv (requester, &reply, 0); | |
printf ("[%d] Received World %d\n", getpid(), request_nbr); | |
zmq_msg_close (&reply); | |
} | |
zmq_msg_init_size (&request, 4); | |
memcpy (zmq_msg_data (&request), "quit", 4); | |
printf ("[%d] Sending quit...\n", getpid()); | |
zmq_send (requester, &request, 0); | |
zmq_msg_close (&request); | |
sleep(1); | |
printf("[%d] Child is cleaning up\n", getpid()); | |
zmq_close (requester); | |
zmq_term (context); | |
printf("[%d] Child is exiting()\n", getpid()); | |
exit(0); | |
} | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
# | |
# Move the first zmq_init to before the fork to trigger the assertion | |
# | |
use strict; | |
use warnings; | |
use ZeroMQ::Raw; | |
use ZeroMQ::Constants ':all'; | |
my $pid = fork(); | |
my $ctx = ZeroMQ::Raw::zmq_init(1); ## Move this line after the fork, and no assertion | |
if ($pid) { ## parent | |
my $sub = ZeroMQ::Raw::zmq_socket($ctx, ZMQ_SUB); | |
ZeroMQ::Raw::zmq_setsockopt($sub, ZMQ_SUBSCRIBE, ''); | |
ZeroMQ::Raw::zmq_bind($sub, 'ipc://x.sock'); | |
while (1) { | |
my $msg = ZeroMQ::Raw::zmq_recv($sub, ZMQ_NOBLOCK); | |
if ($msg) { | |
my $t = ZeroMQ::Raw::zmq_msg_data($msg); | |
l("Topic '$t'"); | |
$msg = ZeroMQ::Raw::zmq_recv($sub, 0); | |
my $p = ZeroMQ::Raw::zmq_msg_data($msg); | |
l("Payload '$p'"); | |
last if $p eq 'quit'; | |
} | |
l("## No message, sleep a bit"); | |
sleep(2); | |
} | |
l("And we are done..."); | |
ZeroMQ::Raw::zmq_close($sub); | |
ZeroMQ::Raw::zmq_term($ctx); | |
waitpid($pid, 0); | |
END { unlink('x.sock') } | |
} | |
else { | |
l('Wait a bit to start publisher...'); | |
sleep(1); | |
my $ctx = ZeroMQ::Raw::zmq_init(1); | |
my $pub = ZeroMQ::Raw::zmq_socket($ctx, ZMQ_PUB); | |
ZeroMQ::Raw::zmq_connect($pub, 'ipc://x.sock'); | |
ZeroMQ::Raw::zmq_send($pub, 't1', ZMQ_SNDMORE); | |
ZeroMQ::Raw::zmq_send($pub, '1', 0); | |
sleep(1); | |
ZeroMQ::Raw::zmq_send($pub, 't4', ZMQ_SNDMORE); | |
ZeroMQ::Raw::zmq_send($pub, '2 one thousand', 0); | |
sleep(1); | |
ZeroMQ::Raw::zmq_send($pub, 't3', ZMQ_SNDMORE); | |
ZeroMQ::Raw::zmq_send($pub, '3 is done', 0); | |
sleep(1); | |
ZeroMQ::Raw::zmq_send($pub, 't2', ZMQ_SNDMORE); | |
ZeroMQ::Raw::zmq_send($pub, 'quit', 0); | |
sleep(1); | |
l("Publisher is done..."); | |
ZeroMQ::Raw::zmq_close($pub); | |
ZeroMQ::Raw::zmq_term($ctx); | |
l("Publisher exits, stage left"); | |
} | |
#sub l {} | |
sub l { print STDERR "## [$$] ", @_, "\n" } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
use strict; | |
use warnings; | |
use ZMQ::LibZMQ2; | |
use ZMQ::Constants ':v2.1.11', ':all'; | |
my $pid = fork(); | |
my $ctx = zmq_init(1); ## Move this line after the fork, and no assertion | |
if ($pid) { ## parent | |
my $sub = zmq_socket($ctx, ZMQ_SUB); | |
zmq_setsockopt($sub, ZMQ_SUBSCRIBE, ''); | |
zmq_bind($sub, 'ipc://x.sock'); | |
while (1) { | |
l("## Checking for messages"); | |
my $msg = zmq_recv($sub, ZMQ_NOBLOCK); | |
if ($msg) { | |
my $t = zmq_msg_data($msg); | |
l("Topic '$t'"); | |
$msg = zmq_recv($sub, 0); | |
my $p = zmq_msg_data($msg); | |
l("Payload '$p'"); | |
last if $p eq 'quit'; | |
} | |
l("## No message, sleep a bit"); | |
sleep(3); | |
} | |
l("And we are done..."); | |
zmq_close($sub); | |
zmq_term($ctx); | |
waitpid($pid, 0); | |
END { unlink('x.sock') } | |
} | |
else { | |
l('Wait a bit to start publisher...'); | |
sleep(1); | |
my $ctx = zmq_init(1); | |
my $pub = zmq_socket($ctx, ZMQ_PUB); | |
zmq_connect($pub, 'ipc://x.sock'); | |
zmq_send($pub, 't1', ZMQ_SNDMORE); | |
zmq_send($pub, '1', 0); | |
sleep(1); | |
zmq_send($pub, 't4', ZMQ_SNDMORE); | |
zmq_send($pub, '2 one thousand', 0); | |
sleep(1); | |
zmq_send($pub, 't3', ZMQ_SNDMORE); | |
zmq_send($pub, '3 is done', 0); | |
sleep(1); | |
zmq_send($pub, 't2', ZMQ_SNDMORE); | |
zmq_send($pub, 'quit', 0); | |
sleep(1); | |
l("Publisher is done..."); | |
zmq_close($pub); | |
zmq_term($ctx); | |
l("Publisher exits, stage left"); | |
} | |
#sub l {} | |
sub l { print STDERR "## [$$] ", @_, "\n" } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment