Skip to content

Instantly share code, notes, and snippets.

@nicholasnet
Created February 16, 2017 14:23
Show Gist options
  • Save nicholasnet/246a62c4c26593d73511b5f21e96d52e to your computer and use it in GitHub Desktop.
Save nicholasnet/246a62c4c26593d73511b5f21e96d52e to your computer and use it in GitHub Desktop.
<?php
declare(strict_types=1);
/**
* Copyright (c) nicholasnet
*/
namespace Ideasbucket;
use Symfony\Component\Process\Process;
/**
* Class ProcessManager
*
* @package Ideasbucket
*/
class ProcessManager
{
/**
* @var int
*/
private $concurrency = 20;
/**
* @var array
*/
private $processes = [];
/**
* @var \ArrayIterator
*/
private $inProcess;
/**
* @var Callable
*/
private $onSuccess;
/**
* @var Callable
*/
private $onError;
/**
* @var bool
*/
private $ran = false;
/**
* ProcessManager constructor.
*
* @param mixed $processes
* @param array $opts
*/
public function __construct($processes, array $opts)
{
$this->processes = $this->getIterator($processes);
$this->onSuccess = (isset($opts['success']) && is_callable($opts['success'])) ? $opts['success'] : $this->getNoOp();
$this->onError = (isset($opts['error']) && is_callable($opts['error'])) ? $opts['error'] : $this->getNoOp();
}
/**
* Returns an iterator for the given value.
*
* @param mixed $value
*
* @return \Iterator
*/
private function getIterator($value)
{
if ($value instanceof \Iterator) {
return $value;
}
if (is_array($value)) {
return new \ArrayIterator($value);
}
return new \ArrayIterator([$value]);
}
/**
* @return \Closure
*/
private function getNoOp() : \Closure
{
return function($response, $index) {};
}
/**
* @param int $concurrency
*
* @return ProcessManager
*
* @throws \LogicException
*/
public function setConcurrency(int $concurrency): ProcessManager
{
if ($this->ran === true) {
throw new \LogicException('Cannot change concurrency after process run.');
}
if ($concurrency <= 0) {
throw new \InvalidArgumentException('Concurrency cannot be 0 or negative value.');
}
$this->concurrency = $concurrency;
return $this;
}
/**
* Executes the processes.
*/
public function run()
{
$this->addPending();
$this->ran = true;
while (count($this->inProcess) !== 0) {
$this->monitorPending();
}
// Clear the references for callbacks.
$this->onSuccess = null;
$this->onError = null;
}
/**
* Adds the pending
*/
private function addPending()
{
while (count($this->inProcess) < $this->concurrency) {
$item = $this->getUnprocessed();
if (empty($item)) {
break;
}
$this->inProcess[$item['index']] = $item['process'];
$item['process']->start();
if (count($this->inProcess) >= $this->concurrency) {
break;
}
}
}
/**
* @return array|null
*/
private function getUnprocessed()
{
if (!$this->processes || !$this->processes->valid()) {
return null;
}
$currentProcess = $this->processes->current();
if (($currentProcess instanceof Process) === false) {
$this->processes->next();
return $this->getUnprocessed();
}
$data = ['index' => $this->processes->key(), 'process' => $currentProcess];
$this->processes->next();
return $data;
}
/**
* Monitors the pending process.
*/
private function monitorPending()
{
foreach ($this->inProcess as $index => $process) {
/** @var Process $process */
if ($process->isTerminated()) {
if ($process->isSuccessful()) {
call_user_func($this->onSuccess, $process->getOutput(), $index);
} else {
call_user_func($this->onError, $process->getErrorOutput(), $index);
}
unset($this->inProcess[$index]);
$this->addPending();
}
}
}
}
// Example Usage
$requests = function ($total) {
for ($i = 0; $i < $total; $i++) {
yield new Process('ls -lsa');
}
};
(new \Ideasbucket\ProcessManager($requests(100), [
'error' => function ($response, $index) {},
'success' => function ($response, $index) {}
]))->run();
// OR
$processes = [];
for ($i = 0; $i < 100; $i++) {
$process[] = new Process('ls -lsa');
}
(new \Ideasbucket\ProcessManager($processes, [
'error' => function ($response, $index) {},
'success' => function ($response, $index) {}
]))->setConcurrency(10)->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment