-
-
Save humbertodosreis/ea0dd5e19c15830b750d71437a0f7a9a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// bin/swoole_consume.php | |
use Symfony\Component\Filesystem\LockHandler; | |
use Symfony\Component\Process\PhpExecutableFinder; | |
require_once __DIR__.'/../vendor/autoload.php'; | |
$phpBin = (new PhpExecutableFinder)->find(); | |
if (false === $phpBin) { | |
throw new \LogicException('Php executable could not be found'); | |
} | |
$daemon = new \App\Infra\SwooleDaemon(); | |
$daemon->addWorker(1, $phpBin, [__DIR__ . '/console', 'quartz:scheduler', '-vvv']); | |
$daemon->addWorker(3, $phpBin, [__DIR__ . '/console', 'enqueue:consume', '--setup-broker', '-vvv']); | |
$daemon->run(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Infra; | |
class SwooleDaemon | |
{ | |
/** | |
* @var int[] | |
*/ | |
private $pipes = []; | |
/** | |
* @var \swoole_process[] | |
*/ | |
private $workers = []; | |
/** | |
* @var bool | |
*/ | |
private $exiting = false; | |
/** | |
* @var array | |
*/ | |
private $workersData = []; | |
public function addWorker(int $quantity, string $execFile, array $arguments):void | |
{ | |
$data = ['quantity' => $quantity, 'execFile' => $execFile, 'arguments' => $arguments]; | |
$this->workersData[md5(serialize($data))] = $data; | |
} | |
public function run():void | |
{ | |
putenv('MASTER_PROCESS_PID=' . getmypid()); | |
\swoole_process::signal(SIGCHLD, [$this, 'rebootProcess']); | |
\swoole_process::signal(SIGTERM, [$this, 'handleSignal']); | |
\swoole_process::signal(SIGQUIT, [$this, 'handleSignal']); | |
\swoole_process::signal(SIGINT, [$this, 'handleSignal']); | |
\swoole_process::signal(SIGUSR1, [$this, 'handleSignal']); | |
\swoole_process::signal(SIGUSR2, [$this, 'handleSignal']); | |
foreach ($this->workersData as $dataId => $workerData) { | |
foreach (range(1, $workerData['quantity']) as $index) { | |
$this->execProcess($dataId, $workerData); | |
} | |
} | |
// https://wiki.swoole.com/wiki/page/215.html "Precautions" | |
foreach ($this->pipes as $process) { | |
swoole_event_add($process->pipe, [$this, 'onReceive']); | |
} | |
} | |
public function onReceive(int $pipe):void | |
{ | |
if (array_key_exists($pipe, $this->pipes)) { | |
$worker = $this->pipes[$pipe]; | |
echo str_replace(PHP_EOL, PHP_EOL . $worker->pid . " | ", $worker->read()); | |
} | |
} | |
public function handleSignal(int $signal):void | |
{ | |
echo "master | signal caught\n"; | |
switch ($signal) { | |
case SIGUSR1: // reload | |
echo "master | reloading workers\n"; | |
foreach (array_keys($this->workers) as $cPid) { | |
\swoole_process::kill($cPid, SIGTERM); | |
} | |
break; | |
case SIGUSR2: // print status | |
echo "master | workers data status {dataId} => {data}:\n"; | |
foreach ($this->workersData as $dataId => $data) { | |
echo "master | {$dataId} => ".json_encode($data)."\n"; | |
} | |
echo "master | workers status {pid} => {dataId} => {working}:\n"; | |
foreach ($this->workers as $cPid => $process) { | |
echo sprintf( | |
"master | %s => %s => %s\n", | |
$cPid, | |
$process->dataId, | |
\swoole_process::kill($cPid, 0) ? 'yes' : 'no' | |
); | |
} | |
echo "master | pipes status {pipe} => {datId}:\n"; | |
foreach ($this->pipes as $pipe => $process) { | |
echo "master | {$pipe} => {$process->dataId}\n"; | |
} | |
echo "master | exiting => '".$this->exiting ? 'true' : 'false'."'\n"; | |
break; | |
case SIGTERM: // 15 : supervisor default stop | |
case SIGQUIT: // 3 : kill -s QUIT | |
case SIGINT: // 2 : ctrl+c | |
$this->exiting = true; | |
foreach (array_keys($this->workers) as $cPid) { | |
\swoole_process::kill($cPid, SIGTERM); | |
} | |
while ($ret = \swoole_process::wait()) { | |
echo "master | child process {$ret['pid']} exited\n"; | |
$this->removeProcess($ret['pid']); | |
if (empty($this->workers)) { | |
echo "master | exited\n"; | |
exit; | |
} | |
} | |
break; | |
default: | |
break; | |
} | |
} | |
public function execProcess(string $dataId, array $workerData):\swoole_process | |
{ | |
$process = new \swoole_process(function (\swoole_process $process) use ($workerData) { | |
$process->exec($workerData['execFile'], $workerData['arguments']); | |
}, true, true); | |
$process->dataId = $dataId; | |
$process->start(); | |
$this->pipes[$process->pipe] = $process; | |
$this->workers[$process->pid] = $process; | |
return $process; | |
} | |
public function rebootProcess():void | |
{ | |
// represents the child process has closed, recycle it | |
if ($ret = \swoole_process::wait()) { | |
echo "master | Child process: {$ret['pid']} exited with status {$ret['code']}\n"; | |
if (false == $this->exiting) { | |
$oldProcess = $this->workers[$ret['pid']]; | |
$this->removeProcess($ret['pid']); | |
$workerData = $this->workersData[$oldProcess->dataId]; | |
$newProcess = $this->execProcess($oldProcess->dataId, $workerData); | |
// https://wiki.swoole.com/wiki/page/215.html "Precautions" | |
swoole_event_add($newProcess->pipe, [$this, 'onReceive']); | |
echo "master | Reboot process: {$ret['pid']} => {$newProcess->pid} Done\n"; | |
} | |
} | |
} | |
public function removeProcess(int $pid):void | |
{ | |
$worker = $this->workers[$pid]; | |
swoole_event_del($worker->pipe); | |
unset($this->pipes[$worker->pipe]); | |
unset($this->workers[$worker->pid]); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// bin/symfony_consume.php | |
use App\Infra\SymfonyDaemon; | |
use Symfony\Component\Process\ProcessBuilder; | |
require __DIR__.'/../vendor/autoload.php'; | |
$workerBuilder = new ProcessBuilder(['bin/console', 'enqueue:consume', '--setup-broker', '-vvv']); | |
$workerBuilder->setPrefix('php'); | |
$workerBuilder->setWorkingDirectory(realpath(__DIR__.'/..')); | |
$daemon = new SymfonyDaemon($workerBuilder); | |
$daemon->start(3); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Infra; | |
use App\Infra\Enqueue\ConsumeDaemon; | |
use Symfony\Component\Process\ProcessBuilder; | |
require __DIR__.'/../vendor/autoload.php'; | |
$workerBuilder = new ProcessBuilder(['bin/console', 'enqueue:consume', '--setup-broker', '-vvv']); | |
$workerBuilder->setPrefix('php'); | |
$workerBuilder->setWorkingDirectory(realpath(__DIR__.'/..')); | |
$daemon = new ConsumeDaemon($workerBuilder); | |
$daemon->start(3);; | |
use Symfony\Component\Process\Process; | |
use Symfony\Component\Process\ProcessBuilder; | |
class SymfonyDaemon | |
{ | |
/** | |
* @var ProcessBuilder | |
*/ | |
private $workerBuilder; | |
/** | |
* @var Process[] | |
*/ | |
private $workers; | |
/** | |
* @param ProcessBuilder $workerBuilder | |
*/ | |
public function __construct(ProcessBuilder $workerBuilder) | |
{ | |
$this->workerBuilder = $workerBuilder; | |
$this->workers = []; | |
$this->interrupt = false; | |
} | |
public function start($workersNumber) | |
{ | |
/** @var Process[] $workers */ | |
$workers = []; | |
$handleSignal = function($signal) use (&$workers) { | |
switch ($signal) { | |
case SIGUSR1: | |
echo 'Daemon is reloading now.'.PHP_EOL; | |
foreach ($workers as $id => $worker) { | |
$worker->signal(SIGQUIT); | |
} | |
break; | |
case SIGTERM: // 15 : supervisor default stop | |
case SIGQUIT: // 3 : kill -s QUIT | |
case SIGINT: // 2 : ctrl+c | |
foreach ($workers as $id => $worker) { | |
$worker->stop(3, SIGQUIT); | |
} | |
while ($workers) { | |
foreach ($workers as $id => $worker) { | |
if (false == $worker->isRunning()) { | |
unset($workers[$id]); | |
} | |
} | |
} | |
exit; | |
break; | |
default: | |
break; | |
} | |
}; | |
pcntl_signal(SIGTERM, $handleSignal); | |
pcntl_signal(SIGQUIT, $handleSignal); | |
pcntl_signal(SIGINT, $handleSignal); | |
pcntl_signal(SIGUSR1, $handleSignal); | |
foreach (range(1, $workersNumber) as $id) { | |
$workers[] = $this->startWorker($id); | |
} | |
while (true) { | |
pcntl_signal_dispatch(); | |
foreach ($workers as $id => $worker) { | |
if (false == $worker->isRunning()) { | |
echo sprintf('Worker %s exited with status %d', $id, $worker->getExitCode()).PHP_EOL; | |
unset($this->workers[$id]); | |
$workers[$id] = $this->startWorker($id); | |
} | |
} | |
pcntl_signal_dispatch(); | |
sleep(1); | |
} | |
} | |
/** | |
* @param int $workerId | |
* | |
* @return Process | |
*/ | |
public function startWorker($workerId) | |
{ | |
if (array_key_exists($workerId, $this->workers)) { | |
throw new \LogicException(sprintf('Such worker %s is already in pool.', $workerId)); | |
} | |
echo sprintf('Start worker %s', $workerId).PHP_EOL; | |
$process = $this->workerBuilder->getProcess(); | |
$process->start(function($type, $buffer) use ($workerId) { | |
echo $workerId.' | '.$buffer; | |
}); | |
if (false == $process->isStarted()) { | |
throw new \LogicException('Cannot start a worker process.'); | |
} | |
return $process; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment