Prooph Projection with GapDetection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
/** | |
* TRIFFT BACKEND | |
* @copyright Copyright (c) 2019 TRIFFT ME s.r.o. (https://www.trifft.me) | |
* @author Matus Nickel <matus@trifft.me> | |
*/ | |
namespace App\Infrastructure\Command; | |
use Prooph\Bundle\EventStore\Command\AbstractProjectionCommand; | |
use Prooph\Bundle\EventStore\Command\FormatsOutput; | |
use Prooph\Bundle\EventStore\Exception\RuntimeException; | |
use Prooph\Bundle\EventStore\Projection\Projection; | |
use Prooph\Bundle\EventStore\Projection\ReadModelProjection; | |
use Prooph\EventStore\Pdo\Projection\GapDetection; | |
use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector; | |
use Psr\Container\ContainerInterface; | |
use Symfony\Component\Console\Command\Command; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Input\InputOption; | |
use Symfony\Component\Console\Output\OutputInterface; | |
final class ProjectionCommand extends AbstractProjectionCommand | |
{ | |
use FormatsOutput; | |
const OPTION_RUN_ONCE = 'run-once'; | |
protected const ARGUMENT_PROJECTION_NAME = 'projection-name'; | |
public function __construct( | |
private ContainerInterface $projectionManagerForProjectionsLocator, | |
ContainerInterface $projectionsLocator, | |
ContainerInterface $projectionReadModelLocator, | |
ContainerInterface $projectionsOptionsLocator | |
) { | |
parent::__construct( | |
$projectionManagerForProjectionsLocator, | |
$projectionsLocator, | |
$projectionReadModelLocator, | |
$projectionsOptionsLocator | |
); | |
} | |
protected function configure(): void | |
{ | |
parent::configure(); | |
$this | |
->setName('event-store:projection:run') | |
->setDescription('Runs a projection') | |
->addOption(static::OPTION_RUN_ONCE, 'o', InputOption::VALUE_NONE, 'Loop the projection only once, then exit'); | |
} | |
protected function execute(InputInterface $input, OutputInterface $output) | |
{ | |
$keepRunning = !$input->getOption(static::OPTION_RUN_ONCE); | |
$output->writeln( | |
sprintf( | |
'<action>Starting projection <highlight>%s</highlight>. Keep running: <highlight>%s</highlight></action>', $this->projectionName, | |
$keepRunning ? 'enabled' : 'disabled' | |
) | |
); | |
$projector = $this->projection->project($this->projector); | |
$projector->run($keepRunning); | |
$output->writeln(sprintf('<action>Projection <highlight>%s</highlight> completed.</action>', $this->projectionName)); | |
return Command::SUCCESS; | |
} | |
protected function initialize(InputInterface $input, OutputInterface $output): void | |
{ | |
$input->validate(); | |
$this->formatOutput($output); | |
$this->projectionName = $input->getArgument(static::ARGUMENT_PROJECTION_NAME); | |
$gapDetection = new GapDetection( | |
// Configure retries in case a gap is detected | |
[ | |
0, // First retry without any sleep time | |
40, // Second retry | |
100, // third retry | |
], | |
new \DateInterval('PT120S') | |
); | |
if (!$this->projectionManagerForProjectionsLocator->has($this->projectionName)) { | |
throw new RuntimeException(sprintf('ProjectionManager for "%s" not found', $this->projectionName)); | |
} | |
$this->projectionManager = $this->projectionManagerForProjectionsLocator->get($this->projectionName); | |
if (!$this->projectionsLocator->has($this->projectionName)) { | |
throw new RuntimeException(sprintf('Projection "%s" not found', $this->projectionName)); | |
} | |
$this->projection = $this->projectionsLocator->get($this->projectionName); | |
if ($this->projection instanceof ReadModelProjection) { | |
if (!$this->projectionReadModelLocator->has($this->projectionName)) { | |
throw new RuntimeException(sprintf('ReadModel for "%s" not found', $this->projectionName)); | |
} | |
$this->readModel = $this->projectionReadModelLocator->get($this->projectionName); | |
$this->projector = $this->projectionManager->createReadModelProjection($this->projectionName, $this->readModel, [ | |
PdoEventStoreProjector::OPTION_GAP_DETECTION => $gapDetection, | |
]); | |
} | |
if ($this->projection instanceof Projection) { | |
$this->projector = $this->projectionManager->createProjection($this->projectionName, [ | |
PdoEventStoreProjector::OPTION_GAP_DETECTION => $gapDetection, | |
]); | |
} | |
if (null === $this->projector) { | |
throw new RuntimeException('Projection was not created'); | |
} | |
$output->writeln(sprintf('<header>Initialized projection "%s"</header>', $this->projectionName)); | |
try { | |
$state = $this->projectionManager->fetchProjectionStatus($this->projectionName)->getValue(); | |
} catch (\Prooph\EventStore\Exception\RuntimeException) { | |
$state = 'unknown'; | |
} | |
$output->writeln(sprintf('<action>Current status: <highlight>%s</highlight></action>', $state)); | |
$output->writeln('====================='); | |
$output->writeln('GAP DETECTION ENABLED'); | |
$output->writeln('====================='); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment