Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Prooph Projection with GapDetection
<?php
declare(strict_types=1);
/**
* TRIFFT BACKEND
* @copyright Copyright (c) 2019 TRIFFT ME s.r.o. (https://www.trifft.me)
* @author Matus Nickel <matus@trifft.me>
*/
namespace App\Infrastructure\Command;
use Prooph\Bundle\EventStore\Command\AbstractProjectionCommand;
use Prooph\Bundle\EventStore\Command\FormatsOutput;
use Prooph\Bundle\EventStore\Exception\RuntimeException;
use Prooph\Bundle\EventStore\Projection\Projection;
use Prooph\Bundle\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\Pdo\Projection\GapDetection;
use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
final class ProjectionCommand extends AbstractProjectionCommand
{
use FormatsOutput;
const OPTION_RUN_ONCE = 'run-once';
protected const ARGUMENT_PROJECTION_NAME = 'projection-name';
public function __construct(
private ContainerInterface $projectionManagerForProjectionsLocator,
ContainerInterface $projectionsLocator,
ContainerInterface $projectionReadModelLocator,
ContainerInterface $projectionsOptionsLocator
) {
parent::__construct(
$projectionManagerForProjectionsLocator,
$projectionsLocator,
$projectionReadModelLocator,
$projectionsOptionsLocator
);
}
protected function configure(): void
{
parent::configure();
$this
->setName('event-store:projection:run')
->setDescription('Runs a projection')
->addOption(static::OPTION_RUN_ONCE, 'o', InputOption::VALUE_NONE, 'Loop the projection only once, then exit');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$keepRunning = !$input->getOption(static::OPTION_RUN_ONCE);
$output->writeln(
sprintf(
'<action>Starting projection <highlight>%s</highlight>. Keep running: <highlight>%s</highlight></action>', $this->projectionName,
$keepRunning ? 'enabled' : 'disabled'
)
);
$projector = $this->projection->project($this->projector);
$projector->run($keepRunning);
$output->writeln(sprintf('<action>Projection <highlight>%s</highlight> completed.</action>', $this->projectionName));
return Command::SUCCESS;
}
protected function initialize(InputInterface $input, OutputInterface $output): void
{
$input->validate();
$this->formatOutput($output);
$this->projectionName = $input->getArgument(static::ARGUMENT_PROJECTION_NAME);
$gapDetection = new GapDetection(
// Configure retries in case a gap is detected
[
0, // First retry without any sleep time
40, // Second retry
100, // third retry
],
new \DateInterval('PT120S')
);
if (!$this->projectionManagerForProjectionsLocator->has($this->projectionName)) {
throw new RuntimeException(sprintf('ProjectionManager for "%s" not found', $this->projectionName));
}
$this->projectionManager = $this->projectionManagerForProjectionsLocator->get($this->projectionName);
if (!$this->projectionsLocator->has($this->projectionName)) {
throw new RuntimeException(sprintf('Projection "%s" not found', $this->projectionName));
}
$this->projection = $this->projectionsLocator->get($this->projectionName);
if ($this->projection instanceof ReadModelProjection) {
if (!$this->projectionReadModelLocator->has($this->projectionName)) {
throw new RuntimeException(sprintf('ReadModel for "%s" not found', $this->projectionName));
}
$this->readModel = $this->projectionReadModelLocator->get($this->projectionName);
$this->projector = $this->projectionManager->createReadModelProjection($this->projectionName, $this->readModel, [
PdoEventStoreProjector::OPTION_GAP_DETECTION => $gapDetection,
]);
}
if ($this->projection instanceof Projection) {
$this->projector = $this->projectionManager->createProjection($this->projectionName, [
PdoEventStoreProjector::OPTION_GAP_DETECTION => $gapDetection,
]);
}
if (null === $this->projector) {
throw new RuntimeException('Projection was not created');
}
$output->writeln(sprintf('<header>Initialized projection "%s"</header>', $this->projectionName));
try {
$state = $this->projectionManager->fetchProjectionStatus($this->projectionName)->getValue();
} catch (\Prooph\EventStore\Exception\RuntimeException) {
$state = 'unknown';
}
$output->writeln(sprintf('<action>Current status: <highlight>%s</highlight></action>', $state));
$output->writeln('=====================');
$output->writeln('GAP DETECTION ENABLED');
$output->writeln('=====================');
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment