Skip to content

Instantly share code, notes, and snippets.

@bluescreen
Created September 23, 2014 19:01
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bluescreen/9ef790501211d09f2bb3 to your computer and use it in GitHub Desktop.
Save bluescreen/9ef790501211d09f2bb3 to your computer and use it in GitHub Desktop.
Broadway Projection Rebuild
<?php
/**
* Projection Rebuilder
* @author Markus Muschol <markus.muschol@gmx.de>
*/
use Broadway\Domain\DateTime;
use Broadway\Domain\DomainEventStream;
use Broadway\Domain\DomainMessage;
use Broadway\Domain\Metadata;
use Broadway\EventHandling\EventBusInterface;
use Broadway\Serializer\SerializerInterface;
use Elkuku\Console\Helper\ConsoleProgressBar;
use EventStore\EventStore;
use EventStore\StreamFeed\Entry;
use EventStore\StreamFeed\EntryEmbedMode;
use EventStore\StreamFeed\LinkRelation;
use EventStore\StreamFeed\StreamFeed;
/**
* Class BroadwayProjectionRebuilder
* @package CQRS
*/
class BroadwayProjectionRebuilder {
/** Event Store url */
const URL = "http://127.0.0.1:2113";
/** @var bool */
private $isConnected = false;
/** @var EventStore */
private $eventStore;
/** @var SerializerInterface */
private $serializer;
/**
* @param SerializerInterface $serializer
*/
public function __construct(SerializerInterface $serializer){
$this->serializer = $serializer;
$this->eventStore = new EventStore(self::URL);
}
/**
* Replay all event streams and republish all messages to rebuild projection
* @param EventBusInterface $eventBus
* @param int $chunkSize
* @param ConsoleProgressBar $progressBar
*/
public function replayStreams(EventBusInterface $eventBus, $chunkSize = 500, ConsoleProgressBar $progressBar = null){
/** @var StreamFeed $feed */
$feed = $this->eventStore->openStreamFeed('$streams', EntryEmbedMode::BODY());
$totalStreams = $this->getEventCount($feed);
$this->eventStore->navigateStreamFeed($feed, LinkRelation::NEXT());
$events = [];
$streamCount = 0;
$eventCount = 0;
echo "Rebuilding $totalStreams projections ...\n";
// Optional show Progressbar
if($progressBar){
$progressBar->reset('- %fraction% [%bar%] %percent% Elapsed Time: %elapsed%', '=>', '-', 78, $totalStreams);
}
do {
/** @var Entry $entry */
foreach ($feed->getEntries() as $entry) {
$entryEvents = $this->fetchEvents($entry->getAggregateId());
foreach($entryEvents as $event) {
$events[] = $event;
$eventCount++;
}
if($progressBar){
$progressBar->update($streamCount);
}
$streamCount++;
}
// Publish events in chunks to avoid frequent queries
// on the db and running out of memory
if($eventCount > $chunkSize || $totalStreams < $chunkSize){
$eventBus->publish(new DomainEventStream($events));
$events = [];
$eventCount = 0;
}
} while (null !== ($feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::NEXT())));
echo "Done";
}
/**
* Fetch Events from stream
* @param $id
* @return array
*/
private function fetchEvents($id){
$feed = $this->eventStore->openStreamFeed($id);
$feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::PREVIOUS());
$events = [];
do {
/** @var Entry $entry */
foreach ($feed->getEntries() as $entry) {
// Exclude internal streams like $metadata
if($this->isDomainEventStream($entry)){
$events[] = $this->reconstructMessage($entry);
}
}
} while (null !== ($feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::LAST())));
return $events;
}
/**
* Reconstruct Broadway domain message from event
* @param Entry $entry
* @return DomainMessage
*/
private function reconstructMessage(Entry $entry){
list($playhead, $uuid) = explode('@', $entry->getTitle());
$event = $this->eventStore->readEvent($entry->getEventUrl());
return new DomainMessage(
$uuid,
intval($playhead),
new Metadata([]),
$this->serializer->deserialize($event->getData()),
DateTime::fromString($entry->getUpdate())
);
}
/**
* Is a valid Domain stream?
* @param $entry
* @return bool
*/
private function isDomainEventStream($entry){
return substr($entry->getType(),0,1) != '$';
}
/**
* @param StreamFeed $feed
* @return int
*/
private function getEventCount(StreamFeed $feed){
$eTag = $feed->getJson()['eTag'];
return intval(substr($eTag,0,strpos($eTag,';')))+1;
}
}
// Extension in Entry class
class Entry{
public function getTitle(){
return $this->json['title'];
}
public function getData(){
return isset($this->json['data']) ? json_decode($this->json['data'],true) : [];
}
public function getAggregateId(){
$parts = explode('@', $this->getTitle());
return $parts[1];
}
}
@dbellettini
Copy link

i see some bad smells and style is not PSR compliant, if you send us a pull request we can review it together

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment