Skip to content

Instantly share code, notes, and snippets.

@cesurapp
Created September 11, 2020 11:33
Show Gist options
  • Save cesurapp/42eaf953172fc7ea6a8b193694645324 to your computer and use it in GitHub Desktop.
Save cesurapp/42eaf953172fc7ea6a8b193694645324 to your computer and use it in GitHub Desktop.
Symfony Async Query using Process Component
<?php
namespace App\Service;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\ORM\Query\Parameter;
use Symfony\Component\Process\Process;
/**
* Async SQL Query
*/
class AsyncQuery
{
/**
* @var Process[]
*/
private array $process = [];
private array $sqls = [];
private array $result = [];
/**
* Add SQL
*/
public function addSql(string $uid, string $sql, array $parameters = [], array $type = []): void
{
if ($parameters instanceof ArrayCollection) {
$parameters = $parameters->map(static function (Parameter $param) {
return $param->getValue();
})->toArray();
}
$this->sqls[$uid] = [
'query' => $sql,
'param' => $parameters,
'type' => $type
];
}
/**
* Non-Blocking Process in Background
*/
public function start(): void
{
foreach ($this->sqls as $uid => $sql) {
$process = Process::fromShellCommandline(sprintf('bin/console app:run-query --sql=%s --param=%s --type=%s',
base64_encode($sql['query']),
base64_encode(serialize($sql['param'])),
base64_encode(serialize($sql['type'])))
, getcwd() . '/../');
$process->setTimeout(180);
$process->start();
$this->process[$uid] = $process;
}
}
/**
* Wait for all Process to complete.
*/
public function startAndWait(): void
{
$this->start();
foreach ($this->process as $process) {
$process->wait();
}
}
/**
* Get results of all Process.
*/
public function getResult(): array
{
foreach ($this->process as $uid => $process) {
try {
$this->result[$uid] = $process->isSuccessful() ? unserialize(trim($process->getOutput())) : $process->getErrorOutput();
} catch (\Exception $e) {
$this->result[$uid] = $e->getMessage() . PHP_EOL . $process->getOutput();
}
}
return $this->result;
}
}
<?php
namespace App\Command;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DBALException;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
/**
* Run Background SQL Query
*/
class RunQueryCommand extends Command
{
protected static $defaultName = 'app:run-query';
/**
* @var Connection
*/
private Connection $connection;
public function __construct(Connection $connection)
{
parent::__construct();
$this->connection = $connection;
}
protected function configure()
{
$this
->setDescription('Executes SQL commands in the background.')
->addOption('sql', null, InputOption::VALUE_OPTIONAL, 'SQL Query', false)
->addOption('param', null, InputOption::VALUE_OPTIONAL, 'Query Parameter', false)
->addOption('type', null, InputOption::VALUE_OPTIONAL, 'Parameter Type', false);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
if ($input->getOption('sql') !== false) {
$output->write($this->runSql(
base64_decode($input->getOption('sql')),
unserialize(base64_decode($input->getOption('param'))),
unserialize(base64_decode($input->getOption('type'))),
));
}
return 0;
}
private function runSql(string $queryStr, array $parameters, $type = [])
{
try {
$query = $this->connection->executeQuery($queryStr, $parameters, $type);
} catch (DBALException $e) {
return $e->getMessage();
}
return serialize($query->fetchAll());
}
}
@cesurapp
Copy link
Author

Example:

$asyncQuery->addSql('Query1', 'Select * from user WHERE id = ?', [1]);
$asyncQuery->addSql('Query2', 'Select * from user WHERE id IN (:id)', ['id' => [1,2,3]], [
    'id' => Connection::PARAM_INT_ARRAY
]);
...
$asyncQuery->startAndWait();
$results = $asyncQuery->getResult();

dump($results['Query1'], $results['Query2'])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment