|
<?php |
|
/** |
|
* This prototype implements a ventilator pattern using 10 forked workers to consume 10M messages from the queue in parallel. |
|
*/ |
|
|
|
|
|
function doWork($wid, $maxWork) |
|
{ |
|
/* Process up to 10 requests and return */ |
|
$context = new ZMQContext(); |
|
$worker = $context->getSocket(ZMQ::SOCKET_PULL); |
|
if ($worker->connect("ipc://routing.ipc")) { |
|
echo ". Worker $wid connected successfully to downstream!\n"; |
|
} else { |
|
echo ". Worker $wid failed to connect...\n"; |
|
return false; |
|
} |
|
$total = 0; |
|
while($total < $maxWork) { |
|
$request = $worker->recv(); |
|
$response = md5($request); |
|
$total++; |
|
} |
|
return true; |
|
} |
|
|
|
function sig_handler($sigNum) |
|
{ |
|
$pid = posix_getpid(); |
|
switch($sigNum) { |
|
case SIGUSR1: |
|
echo "> Stable signal received from parent in child pid $pid\n"; |
|
break; |
|
default: |
|
echo "> Caught unknown signal from child pid $pid\n"; |
|
} |
|
} |
|
|
|
$numWorkers = 10; |
|
$totalWork = 10000000; |
|
$maxWork = ceil($totalWork / $numWorkers); |
|
$workers = []; |
|
foreach(range(1, $numWorkers) as $wid) { |
|
$pid = pcntl_fork(); |
|
if ($pid == -1) { |
|
/* Fork failed */ |
|
echo "- Worker $wid failed to fork...\n"; |
|
} elseif ($pid) { |
|
/* Fork successful - still in parent context */ |
|
echo "* Worker $wid with pid $pid forked successfully!\n"; |
|
$workers[$wid] = $pid; |
|
} else { |
|
/* Fork successful - now in child context */ |
|
/* Register the signal handler for workers */ |
|
pcntl_signal(SIGUSR1, "sig_handler"); |
|
/* Wait for parent process to bind before attempting to connect */ |
|
$work = false; |
|
pcntl_sigprocmask(SIG_BLOCK, array(SIGUSR1)); |
|
$sig = pcntl_sigwaitinfo(array(SIGUSR1), $info); |
|
if ($sig == SIGUSR1) { |
|
echo "> Worker $wid received signal to connect...\n"; |
|
$t = microtime(true); |
|
$work = doWork($wid, $maxWork); |
|
$t = microtime(true) - $t; |
|
file_put_contents("worker.$wid",sprintf("# Worker $wid handled $maxWork messages in %.6f seconds\n", $t)); |
|
} |
|
$worked = $work ? ' with work completed' : ' without any work completed'; |
|
echo "+ Worker $wid now exiting{$worked}...\n"; |
|
exit; |
|
} |
|
} |
|
|
|
$context = new ZMQContext(); |
|
/* Bind the ventilator client */ |
|
$client = new ZMQSocket($context, ZMQ::SOCKET_PUSH); |
|
$client->bind("ipc://routing.ipc"); |
|
|
|
/* signal the child workers to connect */ |
|
foreach($workers as $wid => $pid) { |
|
if (posix_kill($pid, SIGUSR1)) { |
|
echo "+ Signal sent to worker $wid with pid $pid successfully!\n"; |
|
} else { |
|
echo "+ Failed to send signal to worker $wid with pid $pid...\n"; |
|
} |
|
} |
|
|
|
/* give the workers a chance to connect before sending */ |
|
sleep(2); |
|
|
|
$t = microtime(true); |
|
for($request = 1; $request <= $totalWork; $request++) { |
|
$client->send($request); |
|
} |
|
$t = microtime(true) - $t; |
|
file_put_contents("parent",sprintf("%% Parent sent $totalWork messages in %.6f seconds\n", $t)); |
|
|
|
/* |
|
In some cases you might need to wait for workers to exit before exiting the parent process if some |
|
workers are slow to consume. |
|
*/ |