Skip to content

Instantly share code, notes, and snippets.

@alle
Forked from fjogeleit/ProjectionRunCommand.php
Created March 26, 2019 10:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alle/ce936884188dc0ba7910aef42c0d8f3e to your computer and use it in GitHub Desktop.
Save alle/ce936884188dc0ba7910aef42c0d8f3e to your computer and use it in GitHub Desktop.
Prooph RedisPlugin
<?php
declare(strict_types=1);
namespace App\Command;
use Prooph\Bundle\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\Projection\ProjectionManager;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjector;
use Psr\Container\ContainerInterface;
use Superbalist\PubSub\PubSubAdapterInterface;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
class ProjectionRunCommand extends ContainerAwareCommand
{
/**
* @var ContainerInterface
*/
private $projectionManagerLocator;
/**
* @var ContainerInterface
*/
private $projectionsLocator;
/**
* @var ContainerInterface
*/
private $projectionReadModelLocator;
/**
* @var ReadModelProjector[]
*/
private $readModelProjectors = [];
/**
* @var PubSubAdapterInterface
*/
private $pubSubAdapter;
/**
* @var array
*/
private $eventCache = [];
public function __construct(
ContainerInterface $projectionManagerLocator,
ContainerInterface $projectionsLocator,
ContainerInterface $projectionReadModelLocator,
PubSubAdapterInterface $pubSubAdapter
) {
parent::__construct();
$this->projectionManagerLocator = $projectionManagerLocator;
$this->projectionsLocator = $projectionsLocator;
$this->projectionReadModelLocator = $projectionReadModelLocator;
$this->pubSubAdapter = $pubSubAdapter;
}
protected function configure()
{
$this
->setName('app:projection:run')
->setDescription('Runs all projections');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output);
$io->title('Starting projection');
$projectManagers = $this->getContainer()->getParameter('prooph_event_store.projection_managers');
$manager = current(array_keys($projectManagers));
/** @var ProjectionManager $projectionManager */
$projectionManager = $this->projectionManagerLocator->get($manager);
$projectionNames = $projectionManager->fetchProjectionNames(null,100);
foreach ($projectionNames as $name) {
/** @var ReadModel $readModel */
$readModel = $this->projectionReadModelLocator->get($name);
/** @var ReadModelProjection $projection */
$projection = $this->projectionsLocator->get($name);
$projector = $projectionManager->createReadModelProjection($name, $readModel);
$this->readModelProjectors[] = $projection->project($projector);
}
$this->pubSubAdapter->subscribe('event_stored', function ($value) use ($io) {
foreach ($this->readModelProjectors as $projector) {
if (true === \array_key_exists($value['event'], $this->managedEvents($projector))) {
$projector->run(false);
}
}
});
}
/**
* @param ReadModelProjector $projector
*
* @return array
*
* @throws \ReflectionException
*/
private function managedEvents(ReadModelProjector $projector): array
{
if (true === \array_key_exists($projector->getName(), $this->eventCache)) {
return $this->eventCache[$projector->getName()];
}
$class = \get_class($projector);
$myClassReflection = new \ReflectionClass($class);
$secret = $myClassReflection->getProperty('handlers');
$secret->setAccessible(true);
$result = \array_keys($secret->getValue($projector));
$combine = array_combine($result, $result);
$this->eventCache[$projector->getName()] = $combine;
return $combine;
}
}
<?php
namespace App\Service;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventSourcing\AggregateChanged;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\Stream;
use Superbalist\PubSub\PubSubAdapterInterface;
class RedisPlugin extends AbstractPlugin
{
/**
* @var PubSubAdapterInterface
*/
private $pubSub;
public function __construct(PubSubAdapterInterface $pubSub)
{
$this->pubSub = $pubSub;
}
public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
{
$this->listenerHandlers[] = $eventStore->attach(ActionEventEmitterEventStore::EVENT_APPEND_TO, function (ActionEvent $event): void {
/** @var AggregateChanged[] $recordedEvents */
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());
foreach ($recordedEvents as $recordedEvent) {
$this->pubSub->publish('event_stored' ,[
'event' => \get_class($recordedEvent),
'data' => $recordedEvent->toArray()
]);
}
}, -1);
$this->listenerHandlers[] = $eventStore->attach(ActionEventEmitterEventStore::EVENT_CREATE, function (ActionEvent $event): void {
/** @var Stream $stream */
$stream = $event->getParam('stream');
foreach ($stream->streamEvents() as $recordedEvent) {
$this->pubSub->publish('event_stored' ,[
'event' => \get_class($recordedEvent),
'data' => $recordedEvent->toArray()
]);
}
}, -1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment