Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Parallel processing on PHP using stream_select()
<?php
require_once('WorkerManager.php');
require_once('Worker.php');
class SleepThenEcho implements Worker
{
private $time;
public function __construct($time)
{
$this->time = intval($time);
}
public function getCommand()
{
return 'sleep ' . $this->time . ' && echo ' . $this->time;
}
public function done($stdout, $stderr)
{
echo 'done ';
echo str_replace(array("\r\n", "\n", "\r"), ' ', var_export(array(
'command' => $this->getCommand(),
'stdout' => $stdout,
'stderr' => $stderr,
), true)), PHP_EOL;
}
public function fail($stdout, $stderr, $status)
{
echo 'fail ';
echo str_replace(array("\r\n", "\n", "\r"), ' ', var_export(array(
'command' => $this->getCommand(),
'stdout' => $stdout,
'stderr' => $stderr,
'status' => $status,
), true)), PHP_EOL;
}
}
$manager = new WorkerManager();
for ($i = -1; $i < 10; $i++) {
$manager->attach(new SleepThenEcho($i));
}
while (0 < count($manager)) {
$manager->listen();
}
% php example.php
fail array ( 'command' => 'sleep -1 && echo -1', 'stdout' => '', 'stderr' => 'sleep: invalid option -- \'1\' Try \'sleep --help\' for more information. ', 'status' => 1, )
done array ( 'command' => 'sleep 0 && echo 0', 'stdout' => '0 ', 'stderr' => '', )
done array ( 'command' => 'sleep 1 && echo 1', 'stdout' => '1 ', 'stderr' => '', )
done array ( 'command' => 'sleep 2 && echo 2', 'stdout' => '2 ', 'stderr' => '', )
done array ( 'command' => 'sleep 3 && echo 3', 'stdout' => '3 ', 'stderr' => '', )
done array ( 'command' => 'sleep 4 && echo 4', 'stdout' => '4 ', 'stderr' => '', )
done array ( 'command' => 'sleep 5 && echo 5', 'stdout' => '5 ', 'stderr' => '', )
done array ( 'command' => 'sleep 6 && echo 6', 'stdout' => '6 ', 'stderr' => '', )
done array ( 'command' => 'sleep 7 && echo 7', 'stdout' => '7 ', 'stderr' => '', )
done array ( 'command' => 'sleep 8 && echo 8', 'stdout' => '8 ', 'stderr' => '', )
done array ( 'command' => 'sleep 9 && echo 9', 'stdout' => '9 ', 'stderr' => '', )
<?php
interface Worker
{
public function getCommand();
public function done($stdout, $stderr);
public function fail($stdout, $stderr, $status);
}
<?php
class WorkerManager implements Countable
{
const STDIN = 0;
const STDOUT = 1;
const STDERR = 2;
const NON_BLOCKING = 0;
const BLOCKING = 1;
private static $DESCRIPTORSPEC = array(
self::STDIN => array('pipe', 'r'),
self::STDOUT => array('pipe', 'w'),
self::STDERR => array('pipe', 'w'),
);
private $workers = array();
private $processes = array();
private $stdins = array();
private $stdouts = array();
private $stderrs = array();
public function attach(Worker $worker)
{
$process = proc_open($worker->getCommand(), self::$DESCRIPTORSPEC, $pipes);
if (false === is_resource($process)) {
throw new \RuntimeException();
}
stream_set_blocking($pipes[self::STDOUT], self::NON_BLOCKING);
$this->workers[] = $worker;
$this->processes[] = $process;
$this->stdins[] = $pipes[self::STDIN];
$this->stdouts[] = $pipes[self::STDOUT];
$this->stderrs[] = $pipes[self::STDERR];
}
public function listen($timeout = 200000)
{
$read = array();
foreach ($this->workers as $i => $_) {
$read[] = $this->stdouts[$i];
$read[] = $this->stderrs[$i];
}
$changed_num = stream_select($read, $write = null, $expect = null, 0, $timeout);
if (false === $changed_num) {
throw new \RuntimeException();
}
if (0 === $changed_num) {
return;
}
foreach ($read as $stream) {
$i = array_search($stream, $this->stdouts, true);
if (false === $i) {
$i = array_search($stream, $this->stderrs, true);
if (false === $i) {
continue;
}
}
$worker = $this->workers[$i];
$stdout = stream_get_contents($this->stdouts[$i]);
$stderr = stream_get_contents($this->stderrs[$i]);
$status = $this->detach($worker);
if (0 === $status) {
$worker->done($stdout, $stderr);
} else if (0 < $status) {
$worker->fail($stdout, $stderr, $status);
} else {
throw new \RuntimeException();
}
}
}
public function detach(Worker $worker)
{
$i = array_search($worker, $this->workers, true);
if (false === $i) {
throw new \RuntimeException();
}
fclose($this->stdins[$i]);
fclose($this->stdouts[$i]);
fclose($this->stderrs[$i]);
$status = proc_close($this->processes[$i]);
unset($this->workers[$i]);
unset($this->processes[$i]);
unset($this->stdins[$i]);
unset($this->stdouts[$i]);
unset($this->stderrs[$i]);
return $status;
}
public function count()
{
return count($this->workers);
}
public function __destruct()
{
array_walk($this->stdins, 'fclose');
array_walk($this->stdouts, 'fclose');
array_walk($this->stderrs, 'fclose');
array_walk($this->processes, 'proc_close');
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.