Skip to content

Instantly share code, notes, and snippets.

@mcustiel
Last active January 5, 2020 12:35
Show Gist options
  • Save mcustiel/1743d622d7e404d4718b469a4b901456 to your computer and use it in GitHub Desktop.
Save mcustiel/1743d622d7e404d4718b469a4b901456 to your computer and use it in GitHub Desktop.
Php Parallel Processing
<?php
namespace Mcustiel\Parallel\Arrays;
class IterableParallelProcessing
{
const CHILD_STREAM_INDEX = 1;
const PARENT_STREAM_INDEX = 0;
const PROCESSES_WAIT_MICROSECONDS = 3000;
/** @var int */
private $parallelCount;
public function __construct(int $parallelCount)
{
$this->parallelCount = $parallelCount;
}
public function execute(iterable $iterable, callable $function, callable $callback)
{
$pids = [];
$result = [];
$socketsCollection = [];
foreach ($iterable as $key => $value) {
$sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$socketsCollection[$key] = $sockets;
$pid = pcntl_fork();
if ($pid == -1) {
die('Could not fork');
} else if ($pid) {
$this->setUpParentStreams($sockets);
$pids[$key] = $pid;
$this->waitForFreeSlots($pids, $socketsCollection, $result);
} else {
$this->runFunctionInChildProcess($function, $sockets, $key, $value);
}
}
$this->waitForRemainingProcessesToFinish($pids, $socketsCollection, $result);
$callback($result);
return $result;
}
private function waitForFreeSlots(array &$pids, array &$socketsCollection, array &$result)
{
while (count($pids) >= $this->parallelCount) {
$this->waitForProcesses($pids, $socketsCollection, $result);
}
}
private function waitForRemainingProcessesToFinish(array &$pids, array &$socketsCollection, array &$result)
{
while (count($pids) > 0) {
$this->waitForProcesses($pids, $socketsCollection, $result);
}
}
private function setUpParentStreams(array $sockets)
{
fclose($sockets[self::PARENT_STREAM_INDEX]);
stream_set_blocking($sockets[self::CHILD_STREAM_INDEX], false);
}
private function runFunctionInChildProcess(callable $function, array $sockets, $key, $value): void
{
fclose($sockets[self::CHILD_STREAM_INDEX]);
$return = $function($key, $value);
fputs($sockets[self::PARENT_STREAM_INDEX], serialize($return));
fclose($sockets[self::PARENT_STREAM_INDEX]);
exit(0);
}
private function waitForProcesses(array &$pids, array &$socketsCollection, array &$result): void
{
foreach($pids as $index => $pid) {
$this->getExecutionResult($index, $socketsCollection, $result);
$status = 0;
$res = pcntl_waitpid($pid, $status, WNOHANG);
$processExited = $res == -1 || $res > 0;
if($processExited) {
fclose($socketsCollection[$index][self::CHILD_STREAM_INDEX]);
unset($socketsCollection[$index]);
unset($pids[$index]);
}
}
usleep(self::PROCESSES_WAIT_MICROSECONDS);
}
private function getExecutionResult($index, array $socketsCollection, array &$result)
{
if (isset($socketsCollection[$index])) {
$data = '';
while(($string = fgets($socketsCollection[$index][self::CHILD_STREAM_INDEX])) !== false) {
$data .= $string;
}
$result[$index] = unserialize($data);
}
}
}
$start = microtime(true);
$arr = [1, 2, 3, 4, 5];
$executor = new IterableParallelProcessing(2);
$executor->execute(
$arr,
function($key, $value) {
$seconds = rand(1, 5);
echo sprintf('Waiting %d seconds before processing %d => %d', $seconds, $key, $value) . PHP_EOL;
sleep($seconds);
echo sprintf('Processing %d => %d', $key, $value) . PHP_EOL;
return $value + 1;
},
function(array $result) {
var_export($result);
echo PHP_EOL;
}
);
echo sprintf(
'Memory usage: %0.3f Kb. Memory usage real: %0.3f Kb. Load average the last minute: %0.3f. Time: %0.3f seconds',
memory_get_peak_usage () / 1024.0,
memory_get_peak_usage (true) / 1024.0,
sys_getloadavg()[0],
microtime(true) - $start
) . PHP_EOL;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment