Skip to content

Instantly share code, notes, and snippets.

@srgoogleguy
Created April 27, 2015 05:09
Show Gist options
  • Save srgoogleguy/b741974c410a91b81a5e to your computer and use it in GitHub Desktop.
Save srgoogleguy/b741974c410a91b81a5e to your computer and use it in GitHub Desktop.

Results from benchmark

  • Worker 1 handled 1000000 messages in 6.778385 seconds

  • Worker 2 handled 1000000 messages in 6.769902 seconds

  • Worker 3 handled 1000000 messages in 7.063248 seconds

  • Worker 4 handled 1000000 messages in 6.603810 seconds

  • Worker 5 handled 1000000 messages in 7.295715 seconds

  • Worker 6 handled 1000000 messages in 6.916242 seconds

  • Worker 7 handled 1000000 messages in 7.018777 seconds

  • Worker 8 handled 1000000 messages in 6.845328 seconds

  • Worker 9 handled 1000000 messages in 6.775695 seconds

  • Worker 10 handled 1000000 messages in 7.020921 seconds

  • Parent sent 10000000 messages in 5.735235 seconds

We were able to successfully send 10 million messages through the ventilator and process them in the 10 parallel workers in under 8 seconds concurrently. Each worker consumed 1 million messages (average of ~150K messages per second) and the ventilator had a grand throughput of ~2 million messages per second.

<?php
/**
* This prototype implements a ventilator pattern using 10 forked workers to consume 10M messages from the queue in parallel.
*/
function doWork($wid, $maxWork)
{
/* Process up to 10 requests and return */
$context = new ZMQContext();
$worker = $context->getSocket(ZMQ::SOCKET_PULL);
if ($worker->connect("ipc://routing.ipc")) {
echo ". Worker $wid connected successfully to downstream!\n";
} else {
echo ". Worker $wid failed to connect...\n";
return false;
}
$total = 0;
while($total < $maxWork) {
$request = $worker->recv();
$response = md5($request);
$total++;
}
return true;
}
function sig_handler($sigNum)
{
$pid = posix_getpid();
switch($sigNum) {
case SIGUSR1:
echo "> Stable signal received from parent in child pid $pid\n";
break;
default:
echo "> Caught unknown signal from child pid $pid\n";
}
}
$numWorkers = 10;
$totalWork = 10000000;
$maxWork = ceil($totalWork / $numWorkers);
$workers = [];
foreach(range(1, $numWorkers) as $wid) {
$pid = pcntl_fork();
if ($pid == -1) {
/* Fork failed */
echo "- Worker $wid failed to fork...\n";
} elseif ($pid) {
/* Fork successful - still in parent context */
echo "* Worker $wid with pid $pid forked successfully!\n";
$workers[$wid] = $pid;
} else {
/* Fork successful - now in child context */
/* Register the signal handler for workers */
pcntl_signal(SIGUSR1, "sig_handler");
/* Wait for parent process to bind before attempting to connect */
$work = false;
pcntl_sigprocmask(SIG_BLOCK, array(SIGUSR1));
$sig = pcntl_sigwaitinfo(array(SIGUSR1), $info);
if ($sig == SIGUSR1) {
echo "> Worker $wid received signal to connect...\n";
$t = microtime(true);
$work = doWork($wid, $maxWork);
$t = microtime(true) - $t;
file_put_contents("worker.$wid",sprintf("# Worker $wid handled $maxWork messages in %.6f seconds\n", $t));
}
$worked = $work ? ' with work completed' : ' without any work completed';
echo "+ Worker $wid now exiting{$worked}...\n";
exit;
}
}
$context = new ZMQContext();
/* Bind the ventilator client */
$client = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$client->bind("ipc://routing.ipc");
/* signal the child workers to connect */
foreach($workers as $wid => $pid) {
if (posix_kill($pid, SIGUSR1)) {
echo "+ Signal sent to worker $wid with pid $pid successfully!\n";
} else {
echo "+ Failed to send signal to worker $wid with pid $pid...\n";
}
}
/* give the workers a chance to connect before sending */
sleep(2);
$t = microtime(true);
for($request = 1; $request <= $totalWork; $request++) {
$client->send($request);
}
$t = microtime(true) - $t;
file_put_contents("parent",sprintf("%% Parent sent $totalWork messages in %.6f seconds\n", $t));
/*
In some cases you might need to wait for workers to exit before exiting the parent process if some
workers are slow to consume.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment