Skip to content

Instantly share code, notes, and snippets.

@wodka
Last active February 18, 2021 20:26
Show Gist options
  • Save wodka/23475d36cf13e956b8db7578bf6251ed to your computer and use it in GitHub Desktop.
Save wodka/23475d36cf13e956b8db7578bf6251ed to your computer and use it in GitHub Desktop.
Simple Symfony3 command Thread Pool
<?php
namespace AppBundle\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Process;
/**
* basic thread pool to execute a fixed amount of worker commands
*/
class ThreadPoolCommand extends Command
{
const INPUT_THREADS = 'threads';
const INPUT_COMMAND = 'exec';
const TICK = 500000;
const MAX_THREADS = 20;
/**
* setup help
*/
protected function configure()
{
$this
->setName('thread:pool')
->addOption(self::INPUT_THREADS, 't', InputOption::VALUE_OPTIONAL, 'number of threads to work (default is one)', 1)
->addArgument(self::INPUT_COMMAND, InputArgument::REQUIRED, 'command to run (to include parameters add quotes)')
;
}
/**
* execute pool
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
public function execute(InputInterface $input, OutputInterface $output)
{
$threads = $input->getOption(self::INPUT_THREADS);
$command = $this->buildCommand($input);
if (empty($threads) || $threads < 1 || $threads > self::MAX_THREADS) {
$output->writeln('pool[master] <error>invalid thread count</error>');
return 1;
}
/** @var Process[] $pool */
$pool = array_fill(0, $threads, null);
$logger = $this->buildLoggers($threads);
$output->writeln('pool[master] start with <info>'.$threads.'</info> workers');
$process = true;
pcntl_signal(SIGTERM, function($signo, $siginfo) use ($output, &$process) {
$output->writeln('pool[master] received SIGTERM - stop threads');
$process = false;
});
while ($process) {
pcntl_signal_dispatch();
for ($i = 0; $i < $threads; $i ++) {
if ($pool[$i] instanceof Process && ($pool[$i]->isRunning() || !$pool[$i]->isStarted())) {
if (!$process) {
$pool[$i]->signal(SIGTERM);
}
continue;
}
if (!$process) {
continue;
}
if ($pool[$i] instanceof Process) {
$output->writeln(sprintf(
'pool[%d] %s failed',
$i,
implode(' ', $command)
));
}
$output->writeln(sprintf(
'pool[%d] %s started',
$i,
implode(' ', $command)
));
$pool[$i] = new Process(implode(' ', $command));
// stop output caching and stream it to strerr and stdout
$pool[$i]->disableOutput();
$pool[$i]->start($logger[$i]);
}
usleep(self::TICK);
}
$output->writeln('pool[master] <error>error with pool</error>');
return 0;
}
/**
* build prefixed loggers
* @param int $threads
*
* @return callable[]
*/
private function buildLoggers($threads)
{
$logger = [];
for ($i = 0; $i < $threads; $i++) {
$prefix = 'pool['.$i.'] ';
$method = <<<PHP
if (\Symfony\Component\Process\Process::ERR === \$type) {
fwrite(STDERR, '$prefix'.str_replace(PHP_EOL, PHP_EOL.'$prefix', trim(\$buffer)).PHP_EOL);
} else {
fwrite(STDOUT, '$prefix'.str_replace(PHP_EOL, PHP_EOL.'$prefix', trim(\$buffer)).PHP_EOL);
}
PHP;
$logger[] = create_function('$type, $buffer', $method);
}
return $logger;
}
/**
* build execution path, also append custom php ini for heroku
*
* @param InputInterface $input
* @return array
*/
private function buildCommand(InputInterface $input)
{
$exec = [
'php'
];
if ($path = php_ini_loaded_file()) {
$exec[] = '--php-ini '.$path;
}
$exec[] = 'bin/console';
$exec[] = $input->getArgument(self::INPUT_COMMAND);
return $exec;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment