Skip to content

Instantly share code, notes, and snippets.

@dinamic
Created February 3, 2020 11:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dinamic/7994bc7d48d363c7603a4f6e2eced78c to your computer and use it in GitHub Desktop.
Save dinamic/7994bc7d48d363c7603a4f6e2eced78c to your computer and use it in GitHub Desktop.
Draft of process manager class using Swoole
<?php
declare(strict_types=1);
namespace Petkanski\Component\ProcessManager\Service;
use LogicException;
use Swoole\Process;
use Swoole\Table;
/**
* Class ProcessManager
* @package Petkanski\Component\ProcessManager\Service
*/
class ProcessManager
{
/**
* @var Process[]
*/
private $workers;
/**
* @var callable[]
*/
private $callbacks;
/**
* @var Table
*/
private $results;
/**
* @var int
*/
private $concurrency;
/**
* @param iterable|callable[] $callbacks
* @param int $concurrency
* @return array
*/
public function execute(array $callbacks, int $concurrency = 3): array
{
$this->callbacks = $callbacks;
$this->concurrency = $concurrency;
$this->results = new Table(count($callbacks));
$this->results->column('result', Table::TYPE_STRING, 9999999);
$this->results->create();
return $this
->spawnWorkers($this->concurrency)
->loadJobs()
->waitWorkers()
->getResult();
}
private function getResult(): array
{
$results = [];
foreach ($this->results as $result) {
$results[] = unserialize($result['result']);
}
$this->results->destroy();
return $results;
}
private function waitWorkers(): self
{
foreach ($this->workers as $key => $worker) {
$data = unserialize($worker->pop());
if ($data['command'] === 'exit') {
unset($this->workers[$key]);
continue;
}
throw new LogicException('Unknown command');
}
return $this;
}
private function loadJobs(): self
{
$keys = array_keys($this->callbacks);
foreach ($keys as $key) {
$workerId = $key % $this->concurrency;
$this->workers[$workerId]->push(serialize([
'command' => 'process',
'id' => $key,
]));
}
usleep(500);
foreach ($this->workers as $worker) {
$worker->push(serialize([
'command' => 'exit',
]));
}
return $this;
}
private function spawnWorkers(int $instances): self
{
if ($instances <= 0) {
throw new LogicException('Invalid instance count');
}
$this->workers = array_map(function (): Process {
$process = new Process(function (Process $worker): void {
while (true) {
$data = unserialize($worker->pop());
if ($data['command'] === 'exit') {
// printf("{$worker->pid} exiting\n");
$worker->push(serialize([
'command' => 'exit',
]));
exit(0);
}
if ($data['command'] !== 'process') {
throw new LogicException('Unknown command');
}
$callbackId = $data['id'];
$callback = $this->callbacks[$callbackId];
$this->results->set((string) $callbackId, [
'result' => serialize($callback($worker)),
]);
}
}, false, 0);
$process->useQueue();
$process->start();
return $process;
}, range(0, $instances-1));
return $this;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment