Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Our initial proof of concept hack for parallelized jobs without rabbitmq.
<?php
namespace Application\CronBundle\Services;
use Symfony\Component\Process\Process;
class CommandAsyncHelper {
private $processes;
public function __construct() {
$this->processes = array();
}
public function getProcesses() {
return $this->processes;
}
public function addCommand($command) {
$this->processes[] = new Process($command);
return $this;
}
public function async($processes, $queueId) {
$safety = count($processes);
$pidRegister = array();
echo "Entering async loop: ". $queueId ."\n\r";
while (count($processes) > 0) {
foreach ($processes as $i => $process) {
$pid = $queueId . ':' . $i;
if (!$process->isStarted() && in_array($pid, $pidRegister) == false) {
echo "Process id: ". $pid." starts\n";
$pidRegister[] = $pid;
$process->start();
continue;
}
echo $process->getIncrementalOutput();
echo $process->getIncrementalErrorOutput();
if (!$process->isRunning()) {
echo "Process id: ".$pid." stopped\n";
$safety--;
unset($processes[$i]);
echo count($processes) . " processes remain.\n";
}
}
if ($safety == 0) {
break;
}
sleep(1);
}
return true;
}
public function executeQueueWithSize($size) {
$queues = array_chunk($this->processes, $size);
echo "Total size: ". count($this->processes) . "\n";
echo "Broken down into " . count($queues) . " queues of " . $size . "\n";
foreach ($queues as $i => $queue) {
if ($this->async($queue, $i, $total)) {
unset($queues[$i]);
echo "Exiting async loop: " . $i ."\n";
}
echo count($queues) . " queues remain. \n";
}
return true;
}
}
<?php
// We follow an orchestrator pattern for our imports, where one command compiles a list of all items to act on,
// and another command carries out the action. This is so we have a "batch" command, and a "singular" one with minimal effort.
//...
foreach (item as $i) {
$command = array(
'php',
'app/console',
'framework:action:import',
'--no-debug',
'--',
$i["id"],
);
if(!is_null($i["other_id"])) {
$command[] = $i["location_id"];
}
$command = implode(' ', $command); // We've already found there's easier ways to do this.
$async->addCommand($command);
}
// Tell the queueing service how large to chunk your processes
if ($async->executeQueueWithSize(7)) {
echo "Done!";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.