Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Broadway Projection Rebuild
<?php
/**
* Projection Rebuilder
* @author Markus Muschol <markus.muschol@gmx.de>
*/
use Broadway\Domain\DateTime;
use Broadway\Domain\DomainEventStream;
use Broadway\Domain\DomainMessage;
use Broadway\Domain\Metadata;
use Broadway\EventHandling\EventBusInterface;
use Broadway\Serializer\SerializerInterface;
use Elkuku\Console\Helper\ConsoleProgressBar;
use EventStore\EventStore;
use EventStore\StreamFeed\Entry;
use EventStore\StreamFeed\EntryEmbedMode;
use EventStore\StreamFeed\LinkRelation;
use EventStore\StreamFeed\StreamFeed;
/**
* Class BroadwayProjectionRebuilder
* @package CQRS
*/
class BroadwayProjectionRebuilder {
/** Event Store url */
const URL = "http://127.0.0.1:2113";
/** @var bool */
private $isConnected = false;
/** @var EventStore */
private $eventStore;
/** @var SerializerInterface */
private $serializer;
/**
* @param SerializerInterface $serializer
*/
public function __construct(SerializerInterface $serializer){
$this->serializer = $serializer;
$this->eventStore = new EventStore(self::URL);
}
/**
* Replay all event streams and republish all messages to rebuild projection
* @param EventBusInterface $eventBus
* @param int $chunkSize
* @param ConsoleProgressBar $progressBar
*/
public function replayStreams(EventBusInterface $eventBus, $chunkSize = 500, ConsoleProgressBar $progressBar = null){
/** @var StreamFeed $feed */
$feed = $this->eventStore->openStreamFeed('$streams', EntryEmbedMode::BODY());
$totalStreams = $this->getEventCount($feed);
$this->eventStore->navigateStreamFeed($feed, LinkRelation::NEXT());
$events = [];
$streamCount = 0;
$eventCount = 0;
echo "Rebuilding $totalStreams projections ...\n";
// Optional show Progressbar
if($progressBar){
$progressBar->reset('- %fraction% [%bar%] %percent% Elapsed Time: %elapsed%', '=>', '-', 78, $totalStreams);
}
do {
/** @var Entry $entry */
foreach ($feed->getEntries() as $entry) {
$entryEvents = $this->fetchEvents($entry->getAggregateId());
foreach($entryEvents as $event) {
$events[] = $event;
$eventCount++;
}
if($progressBar){
$progressBar->update($streamCount);
}
$streamCount++;
}
// Publish events in chunks to avoid frequent queries
// on the db and running out of memory
if($eventCount > $chunkSize || $totalStreams < $chunkSize){
$eventBus->publish(new DomainEventStream($events));
$events = [];
$eventCount = 0;
}
} while (null !== ($feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::NEXT())));
echo "Done";
}
/**
* Fetch Events from stream
* @param $id
* @return array
*/
private function fetchEvents($id){
$feed = $this->eventStore->openStreamFeed($id);
$feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::PREVIOUS());
$events = [];
do {
/** @var Entry $entry */
foreach ($feed->getEntries() as $entry) {
// Exclude internal streams like $metadata
if($this->isDomainEventStream($entry)){
$events[] = $this->reconstructMessage($entry);
}
}
} while (null !== ($feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::LAST())));
return $events;
}
/**
* Reconstruct Broadway domain message from event
* @param Entry $entry
* @return DomainMessage
*/
private function reconstructMessage(Entry $entry){
list($playhead, $uuid) = explode('@', $entry->getTitle());
$event = $this->eventStore->readEvent($entry->getEventUrl());
return new DomainMessage(
$uuid,
intval($playhead),
new Metadata([]),
$this->serializer->deserialize($event->getData()),
DateTime::fromString($entry->getUpdate())
);
}
/**
* Is a valid Domain stream?
* @param $entry
* @return bool
*/
private function isDomainEventStream($entry){
return substr($entry->getType(),0,1) != '$';
}
/**
* @param StreamFeed $feed
* @return int
*/
private function getEventCount(StreamFeed $feed){
$eTag = $feed->getJson()['eTag'];
return intval(substr($eTag,0,strpos($eTag,';')))+1;
}
}
// Extension in Entry class
class Entry{
public function getTitle(){
return $this->json['title'];
}
public function getData(){
return isset($this->json['data']) ? json_decode($this->json['data'],true) : [];
}
public function getAggregateId(){
$parts = explode('@', $this->getTitle());
return $parts[1];
}
}
@dbellettini

This comment has been minimized.

Copy link

commented Sep 28, 2014

i see some bad smells and style is not PSR compliant, if you send us a pull request we can review it together

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.