Simple Symfony3 command Thread Pool
<?php | |
namespace AppBundle\Command; | |
use Symfony\Component\Console\Command\Command; | |
use Symfony\Component\Console\Input\InputArgument; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Input\InputOption; | |
use Symfony\Component\Console\Output\OutputInterface; | |
use Symfony\Component\Process\Process; | |
/** | |
* basic thread pool to execute a fixed amount of worker commands | |
*/ | |
class ThreadPoolCommand extends Command | |
{ | |
const INPUT_THREADS = 'threads'; | |
const INPUT_COMMAND = 'exec'; | |
const TICK = 500000; | |
const MAX_THREADS = 20; | |
/** | |
* setup help | |
*/ | |
protected function configure() | |
{ | |
$this | |
->setName('thread:pool') | |
->addOption(self::INPUT_THREADS, 't', InputOption::VALUE_OPTIONAL, 'number of threads to work (default is one)', 1) | |
->addArgument(self::INPUT_COMMAND, InputArgument::REQUIRED, 'command to run (to include parameters add quotes)') | |
; | |
} | |
/** | |
* execute pool | |
* | |
* @param InputInterface $input | |
* @param OutputInterface $output | |
* @return int | |
*/ | |
public function execute(InputInterface $input, OutputInterface $output) | |
{ | |
$threads = $input->getOption(self::INPUT_THREADS); | |
$command = $this->buildCommand($input); | |
if (empty($threads) || $threads < 1 || $threads > self::MAX_THREADS) { | |
$output->writeln('pool[master] <error>invalid thread count</error>'); | |
return 1; | |
} | |
/** @var Process[] $pool */ | |
$pool = array_fill(0, $threads, null); | |
$logger = $this->buildLoggers($threads); | |
$output->writeln('pool[master] start with <info>'.$threads.'</info> workers'); | |
$process = true; | |
pcntl_signal(SIGTERM, function($signo, $siginfo) use ($output, &$process) { | |
$output->writeln('pool[master] received SIGTERM - stop threads'); | |
$process = false; | |
}); | |
while ($process) { | |
pcntl_signal_dispatch(); | |
for ($i = 0; $i < $threads; $i ++) { | |
if ($pool[$i] instanceof Process && ($pool[$i]->isRunning() || !$pool[$i]->isStarted())) { | |
if (!$process) { | |
$pool[$i]->signal(SIGTERM); | |
} | |
continue; | |
} | |
if (!$process) { | |
continue; | |
} | |
if ($pool[$i] instanceof Process) { | |
$output->writeln(sprintf( | |
'pool[%d] %s failed', | |
$i, | |
implode(' ', $command) | |
)); | |
} | |
$output->writeln(sprintf( | |
'pool[%d] %s started', | |
$i, | |
implode(' ', $command) | |
)); | |
$pool[$i] = new Process(implode(' ', $command)); | |
// stop output caching and stream it to strerr and stdout | |
$pool[$i]->disableOutput(); | |
$pool[$i]->start($logger[$i]); | |
} | |
usleep(self::TICK); | |
} | |
$output->writeln('pool[master] <error>error with pool</error>'); | |
return 0; | |
} | |
/** | |
* build prefixed loggers | |
* @param int $threads | |
* | |
* @return callable[] | |
*/ | |
private function buildLoggers($threads) | |
{ | |
$logger = []; | |
for ($i = 0; $i < $threads; $i++) { | |
$prefix = 'pool['.$i.'] '; | |
$method = <<<PHP | |
if (\Symfony\Component\Process\Process::ERR === \$type) { | |
fwrite(STDERR, '$prefix'.str_replace(PHP_EOL, PHP_EOL.'$prefix', trim(\$buffer)).PHP_EOL); | |
} else { | |
fwrite(STDOUT, '$prefix'.str_replace(PHP_EOL, PHP_EOL.'$prefix', trim(\$buffer)).PHP_EOL); | |
} | |
PHP; | |
$logger[] = create_function('$type, $buffer', $method); | |
} | |
return $logger; | |
} | |
/** | |
* build execution path, also append custom php ini for heroku | |
* | |
* @param InputInterface $input | |
* @return array | |
*/ | |
private function buildCommand(InputInterface $input) | |
{ | |
$exec = [ | |
'php' | |
]; | |
if ($path = php_ini_loaded_file()) { | |
$exec[] = '--php-ini '.$path; | |
} | |
$exec[] = 'bin/console'; | |
$exec[] = $input->getArgument(self::INPUT_COMMAND); | |
return $exec; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment