Skip to content

Instantly share code, notes, and snippets.

@ethanpil
Created June 12, 2017 14:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ethanpil/78e950942ef49bd254b40529b4b61f41 to your computer and use it in GitHub Desktop.
Save ethanpil/78e950942ef49bd254b40529b4b61f41 to your computer and use it in GitHub Desktop.
PHP Process Forks / Parallel Processing
<?php
// Source: https://www.stitcher.io/blog/process-forks
function async(Process $process) : Process {
socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets);
[$parentSocket, $childSocket] = $sockets;
if (($pid = pcntl_fork()) == 0) {
socket_close($childSocket);
socket_write($parentSocket, serialize($process->execute()));
socket_close($parentSocket);
exit;
}
socket_close($parentSocket);
return $process
->setStartTime(time())
->setPid($pid)
->setSocket($childSocket);
}
function wait(array $processes) : array {
$output = [];
while (count($processes)) {
foreach ($processes as $key => $process) {
$processStatus = pcntl_waitpid($process->getPid(), $status, WNOHANG | WUNTRACED);
if ($processStatus == $process->getPid()) {
$output[] = unserialize(socket_read($process->getSocket(), 4096));
socket_close($process->getSocket());
$process->triggerSuccess();
unset($processes[$key]);
} else if ($processStatus == 0) {
if ($process->getStartTime() + $process->getMaxRunTime() < time() || pcntl_wifstopped($status)) {
if (!posix_kill($process->getPid(), SIGKILL)) {
throw new \Exception("Failed to kill {$process->getPid()}: " . posix_strerror(posix_get_last_error()));
}
unset($processes[$key]);
}
} else {
throw new \Exception("Could not reliably manage process {$process->getPid()}");
}
}
if (!count($processes)) {
break;
}
usleep(100000);
}
return $output;
}
//The Process class, used to pass data in a defined way.
abstract class Process
{
protected $pid;
protected $name;
protected $socket;
protected $successCallback;
protected $startTime;
protected $maxRunTime = 300;
public abstract function execute();
public function onSuccess(callable $callback) : Process {
$this->successCallback = $callback;
return $this;
}
public function triggerSuccess() {
if (!$this->successCallback) {
return null;
}
return call_user_func_array($this->successCallback, [$this]);
}
public function setPid($pid) : Process {
$this->pid = $pid;
return $this;
}
public function getPid() {
return $this->pid;
}
public function setSocket($socket) : Process {
$this->socket = $socket;
return $this;
}
public function getSocket() {
return $this->socket;
}
public function setName(string $name) : Process {
$this->name = $name;
return $this;
}
public function getName() : string {
return $this->name;
}
public function setStartTime($startTime) {
$this->startTime = $startTime;
return $this;
}
public function getStartTime() {
return $this->startTime;
}
public function setMaxRunTime(int $maxRunTime) : Process {
$this->maxRunTime = $maxRunTime;
return $this;
}
public function getMaxRunTime() : int {
return $this->maxRunTime;
}
}
//A concrete Process implementation.
class MyProcess extends Process
{
public function execute() {
sleep(1);
return true;
}
}
// And bringing it all together.
$processA = async(new MyProcess());
$processB = async(new MyProcess());
$output = wait([$processA, $processB]);
print_r($output);
die('Done!');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment