Skip to content

Instantly share code, notes, and snippets.

@ianbarber
Created December 5, 2010 12:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ianbarber/729057 to your computer and use it in GitHub Desktop.
Save ianbarber/729057 to your computer and use it in GitHub Desktop.
Distribute work across subprocesses using 0MQ
<?php
$num_workers = 5;
$pid = 1;
for($i = 0; $i < $num_workers; $i++) {
$pid = pcntl_fork();
if($pid == 0) {
break;
}
}
if($pid == 0) {
work($i);
} else {
serve($num_workers);
}
function work($id) {
$context = new ZMQContext();
$workpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$workpipe->connect("ipc://work.ipc");
$poll = new ZMQPoll();
$poll->add($workpipe, ZMQ::POLL_IN);
$controlpipe = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$controlpipe->connect("ipc://control.ipc");
$controlpipe->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
$controlpipe->setSockOpt(ZMQ::SOCKOPT_IDENTITY, "worker" . $id);
$controlpoll = new ZMQPoll();
$controlpoll->add($controlpipe, ZMQ::POLL_IN);
$syncpipe = new ZMQSOcket($context, ZMQ::SOCKET_PUSH);
$syncpipe->connect("ipc://sync.ipc");
$syncpipe->send($id);
$readable = array();
$worked = 0;
while(true) {
$events = $poll->poll($readable, null, 5000);
if($events > 0) {
foreach($readable as $r) {
$message = $r->recv();
//echo "Worker $id got message " . $message . "\n";
usleep(250);
$worked++;
}
} else {
$events = $controlpoll->poll($readable, null, 100);
if($events > 0) {
$message = $readable[0]->recv();
echo "$id Ending!\n";
break;
}
}
}
echo "Worker $id worked $worked times \n";
}
function serve($workers) {
$context = new ZMQContext();
$controlpipe = new ZMQSocket($context, ZMQ::SOCKET_PUB);
$controlpipe->bind("ipc://control.ipc");
$workpipe = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$workpipe->bind("ipc://work.ipc");
// Sync up with workers
$syncpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$syncpipe->bind("ipc://sync.ipc");
$found = 0;
$poll = new ZMQPoll();
$poll->add($syncpipe, ZMQ::POLL_IN);
$readable = array();
while(true) {
$found += $poll->poll($readable, null);;
foreach($readable as $r) {
$message = $r->recv();
echo "Hello $message!\n";
}
if($found == $workers) {
echo "All workers checked in\n";
break;
}
}
foreach(range(1, 10000) as $message) {
//echo "Sending $message\n";
$workpipe->send($message);
}
echo "Sending END\n";
$controlpipe->send("END");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment