Skip to content

Instantly share code, notes, and snippets.

@ahmed-bhs
Created March 3, 2020 10:27
Show Gist options
  • Save ahmed-bhs/1d210c1f6d7468b93d1ff98a43087709 to your computer and use it in GitHub Desktop.
Save ahmed-bhs/1d210c1f6d7468b93d1ff98a43087709 to your computer and use it in GitHub Desktop.
<?php declare(strict_types=1);
namespace GA\Bundle\MonInstallateurBundle\Command;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Driver\Statement;
use eZ\Publish\API\Repository\Repository;
use eZ\Publish\SPI\Persistence\Content\ContentInfo;
use eZ\Publish\SPI\Persistence\Content\Location\Handler;
use InvalidArgumentException;
use PDO;
use Psr\Log\LoggerInterface;
use RuntimeException;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use eZ\Publish\Core\MVC\ConfigResolverInterface;
/**
* Class DeleteChildrenContentsCommand.
*/
class DeleteChildrenContentsCommand extends Command
{
/** @var string string */
protected static $defaultName = 'ezplatform:delete:contents';
/** @var Repository */
private $repository;
/** @var \Doctrine\DBAL\Connection */
private $connection;
/** @var \eZ\Publish\SPI\Persistence\Content\Location\Handler */
private $locationHandler;
/** @var string */
private $phpPath;
/** @var \Psr\Log\LoggerInterface */
private $logger;
/** @var string */
private $siteaccess;
/** @var bool */
private $isDebug;
/** @var string */
protected $projectDir;
public function __construct(
Repository $repository,
Connection $connection,
Handler $locationHandler,
LoggerInterface $logger,
string $siteaccess,
bool $isDebug,
string $phpPath = null,
string $projectDir
) {
$this->repository = $repository;
$this->connection = $connection;
$this->locationHandler = $locationHandler;
$this->phpPath = $phpPath;
$this->logger = $logger;
$this->siteaccess = $siteaccess;
$this->isDebug = $isDebug;
$this->phpPath = $phpPath;
$this->projectDir = $projectDir;
parent::__construct();
}
protected function configure()
{
$this
->setDescription('This command allows you to import certifications from csv file.')
->addOption(
'iteration-count',
'c',
InputOption::VALUE_OPTIONAL,
'Number of objects to be indexed in a single iteration set to avoid using too much memory',
50
)->addOption(
'no-commit',
null,
InputOption::VALUE_NONE,
'Do not commit after each iteration'
)->addOption(
'no-purge',
null,
InputOption::VALUE_NONE,
'Do not purge before indexing'
)->addOption(
'since',
null,
InputOption::VALUE_OPTIONAL,
'Refresh changes since a time provided in any format understood by DateTime. Implies "no-purge", cannot be combined with "content-ids" or "subtree"'
)->addOption(
'content-ids',
null,
InputOption::VALUE_OPTIONAL,
'Comma-separated list of content ID\'s to refresh (deleted/updated/added). Implies "no-purge", cannot be combined with "since" or "subtree"'
)->addOption(
'parent-location',
null,
InputOption::VALUE_OPTIONAL,
'Location ID whose subtree will be indexed (including the Location itself). Implies "no-purge", cannot be combined with "since" or "content-ids"'
)->addOption(
'processes',
null,
InputOption::VALUE_OPTIONAL,
'Number of child processes to run in parallel for iterations, if set to "auto" it will set to number of CPU cores -1, set to "1" or "0" to disable',
'auto'
)
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->output = $output;
$parentLocationId = (int)$input->getOption('parent-location');
if ($contentIds = $input->getOption('content-ids')) {
$contentIds = \explode(',', $contentIds);
$output->writeln(\sprintf(
'Deleting %s of content ID ',
\count($contentIds)
));
foreach ($contentIds as $contentId) {
$this->repository->sudo(function (Repository $repository) use ($contentId, $output) {
$contentInfo = $repository->getContentService()->loadContent($contentId)->contentInfo;
$repository->getContentService()->deleteContent($contentInfo);
});
}
return 0;
}
$iterationCount = $input->getOption('iteration-count');
$stmt = $this->getStatementSubtree($parentLocationId);
$count = (int) $this->getStatementSubtree($parentLocationId, true)->fetchColumn();
$iterations = \ceil($count / $iterationCount);
$processes = $input->getOption('processes');
$processCount = 'auto' === $processes ? $this->getNumberOfCPUCores() - 1 : (int) $processes;
$processCount = \min($iterations, $processCount);
$progress = new ProgressBar($output);
$progress->start($iterations);
if ($processCount > 1) {
$this->runParallelProcess($progress, $stmt, (int) $processCount, (int) $iterationCount);
} else {
// if we only have one process, or less iterations to warrant running several, we index it all inline
foreach ($this->fetchIteration($stmt, $iterationCount) as $contentIds) {
foreach ($contentIds as $contentId) {
$this->repository->sudo(function (Repository $repository) use ($contentId, $output) {
$contentInfo = $repository->getContentService()->loadContent($contentId)->contentInfo;
$repository->getContentService()->deleteContent($contentInfo);
});
}
$progress->advance(1);
}
}
$progress->finish();
return 0;
}
/**
* @param int $iterationCount
*
* @return \Generator Return an array of arrays, each array contains content id's of $iterationCount.
*/
private function fetchIteration(Statement $stmt, $iterationCount)
{
do {
$contentIds = [];
for ($i = 0; $i < $iterationCount; ++$i) {
if ($contentId = $stmt->fetch(PDO::FETCH_COLUMN)) {
$contentIds[] = $contentId;
} elseif (empty($contentIds)) {
return;
} else {
break;
}
}
yield $contentIds;
} while (!empty($contentId));
}
/**
* @param mixed $locationId
* @param bool $count
*
* @return \Doctrine\DBAL\Driver\Statement
*/
private function getStatementSubtree($locationId, $count = false)
{
$location = $this->locationHandler->load($locationId);
$contentId = $location->contentId;
$q = $this->connection->createQueryBuilder()
->select($count ? 'count(DISTINCT c.id)' : 'DISTINCT c.id')
->from('ezcontentobject', 'c')
->innerJoin('c', 'ezcontentobject_tree', 't', 't.contentobject_id = c.id')
->where('c.status = :status')
->andWhere('c.id != :contentId')
->andWhere('t.path_string LIKE :path')
->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
->setParameter('contentId', $contentId, PDO::PARAM_INT)
->setParameter('path', $location->pathString . '%', PDO::PARAM_STR);
return $q->execute();
}
private function runParallelProcess(ProgressBar $progress, Statement $stmt, $processCount, $iterationCount)
{
/** @var \Symfony\Component\Process\Process[]|null[] */
$processes = \array_fill(0, $processCount, null);
$generator = $this->fetchIteration($stmt, $iterationCount);
do {
foreach ($processes as $key => $process) {
if (null !== $process && $process->isRunning()) {
continue;
}
if (null !== $process) {
// One of the processes just finished, so we increment progress bar
$progress->advance(1);
if (!$process->isSuccessful()) {
$this->logger->error('Child indexer process returned: ' . $process->getExitCodeText());
}
}
if (!$generator->valid()) {
unset($processes[$key]);
continue;
}
$processes[$key] = $this->getPhpProcess($generator->current());
$processes[$key]->start();
$generator->next();
}
if (!empty($processes)) {
\sleep(1);
}
} while (!empty($processes));
}
/**
* @return int
*/
private function getNumberOfCPUCores()
{
$cores = 1;
if (\is_file('/proc/cpuinfo')) {
// Linux (and potentially Windows with linux sub systems)
$cpuinfo = \file_get_contents('/proc/cpuinfo');
\preg_match_all('/^processor/m', $cpuinfo, $matches);
$cores = \count($matches[0]);
} elseif (\DIRECTORY_SEPARATOR === '\\') {
// Windows
if (false !== ($process = @\popen('wmic cpu get NumberOfCores', 'rb'))) {
\fgets($process);
$cores = (int) \fgets($process);
\pclose($process);
}
} elseif (false !== ($process = @\popen('sysctl -a', 'rb'))) {
// *nix (Linux, BSD and Mac)
$output = \stream_get_contents($process);
if (\preg_match('/hw.ncpu: (\d+)/', $output, $matches)) {
$cores = (int) $matches[1][0];
}
\pclose($process);
}
return $cores;
}
/**
* @return string
*/
private function getPhpPath()
{
if ($this->phpPath) {
return $this->phpPath;
}
$phpFinder = new PhpExecutableFinder();
$this->phpPath = $phpFinder->find();
if (!$this->phpPath) {
throw new RuntimeException('The php executable could not be found. It is needed for executing parallel subprocesses, so add it to your PATH environment variable and try again');
}
return $this->phpPath;
}
/**
* @param bool $commit
*
* @return \Symfony\Component\Process\Process
*/
private function getPhpProcess(array $contentIds)
{
if (empty($contentIds)) {
throw new InvalidArgumentException('--content-ids', '$contentIds cannot be empty');
}
$consolePath = \sprintf('%s/%s', $this->projectDir, 'bin/console');
$subProcessArgs = [
$this->getPhpPath(),
$consolePath,
'ezplatform:delete:contents',
'--content-ids=' . \implode(',', $contentIds),
'--siteaccess=atlantic_mon_installateur_fr',
];
if ($this->siteaccess) {
$subProcessArgs[] = '--siteaccess=' . $this->siteaccess;
}
if (!$this->isDebug) {
$subProcessArgs[] = '--no-debug';
}
$process = new Process($subProcessArgs);
$process->setTimeout(null);
return $process;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment