Created
February 3, 2020 11:18
-
-
Save dinamic/7994bc7d48d363c7603a4f6e2eced78c to your computer and use it in GitHub Desktop.
Draft of process manager class using Swoole
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace Petkanski\Component\ProcessManager\Service; | |
use LogicException; | |
use Swoole\Process; | |
use Swoole\Table; | |
/** | |
* Class ProcessManager | |
* @package Petkanski\Component\ProcessManager\Service | |
*/ | |
class ProcessManager | |
{ | |
/** | |
* @var Process[] | |
*/ | |
private $workers; | |
/** | |
* @var callable[] | |
*/ | |
private $callbacks; | |
/** | |
* @var Table | |
*/ | |
private $results; | |
/** | |
* @var int | |
*/ | |
private $concurrency; | |
/** | |
* @param iterable|callable[] $callbacks | |
* @param int $concurrency | |
* @return array | |
*/ | |
public function execute(array $callbacks, int $concurrency = 3): array | |
{ | |
$this->callbacks = $callbacks; | |
$this->concurrency = $concurrency; | |
$this->results = new Table(count($callbacks)); | |
$this->results->column('result', Table::TYPE_STRING, 9999999); | |
$this->results->create(); | |
return $this | |
->spawnWorkers($this->concurrency) | |
->loadJobs() | |
->waitWorkers() | |
->getResult(); | |
} | |
private function getResult(): array | |
{ | |
$results = []; | |
foreach ($this->results as $result) { | |
$results[] = unserialize($result['result']); | |
} | |
$this->results->destroy(); | |
return $results; | |
} | |
private function waitWorkers(): self | |
{ | |
foreach ($this->workers as $key => $worker) { | |
$data = unserialize($worker->pop()); | |
if ($data['command'] === 'exit') { | |
unset($this->workers[$key]); | |
continue; | |
} | |
throw new LogicException('Unknown command'); | |
} | |
return $this; | |
} | |
private function loadJobs(): self | |
{ | |
$keys = array_keys($this->callbacks); | |
foreach ($keys as $key) { | |
$workerId = $key % $this->concurrency; | |
$this->workers[$workerId]->push(serialize([ | |
'command' => 'process', | |
'id' => $key, | |
])); | |
} | |
usleep(500); | |
foreach ($this->workers as $worker) { | |
$worker->push(serialize([ | |
'command' => 'exit', | |
])); | |
} | |
return $this; | |
} | |
private function spawnWorkers(int $instances): self | |
{ | |
if ($instances <= 0) { | |
throw new LogicException('Invalid instance count'); | |
} | |
$this->workers = array_map(function (): Process { | |
$process = new Process(function (Process $worker): void { | |
while (true) { | |
$data = unserialize($worker->pop()); | |
if ($data['command'] === 'exit') { | |
// printf("{$worker->pid} exiting\n"); | |
$worker->push(serialize([ | |
'command' => 'exit', | |
])); | |
exit(0); | |
} | |
if ($data['command'] !== 'process') { | |
throw new LogicException('Unknown command'); | |
} | |
$callbackId = $data['id']; | |
$callback = $this->callbacks[$callbackId]; | |
$this->results->set((string) $callbackId, [ | |
'result' => serialize($callback($worker)), | |
]); | |
} | |
}, false, 0); | |
$process->useQueue(); | |
$process->start(); | |
return $process; | |
}, range(0, $instances-1)); | |
return $this; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment