Skip to content

Instantly share code, notes, and snippets.

@codeliner
Last active May 9, 2022 14:55
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save codeliner/14a8d98d53efafdd35e851d76e89cc94 to your computer and use it in GitHub Desktop.
Save codeliner/14a8d98d53efafdd35e851d76e89cc94 to your computer and use it in GitHub Desktop.
prooph MongoEventStore v7
<?php
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use MongoDB\Client;
use MongoDB\Collection;
class MongoConnection
{
/**
* @var Client
*/
private $client;
/**
* @var string
*/
private $dbName;
public function __construct(Client $client, string $dbName)
{
$this->client = $client;
$this->dbName = $dbName;
}
public function client(): Client
{
return $this->client;
}
public function dbName(): string
{
return $this->dbName;
}
public function selectCollection(string $collectionName, array $options = []): Collection
{
return $this->client->selectCollection($this->dbName, $collectionName, $options);
}
public function replaceCollection(string $collectionName, string $withCollection)
{
$adminDb = $this->client->admin;
$cursor = $adminDb->command([
'renameCollection' => $this->dbName . '.' . $withCollection,
'to' => $this->dbName . '.' . $collectionName,
'dropTarget' => true
]);
return current($cursor->toArray());
}
}
<?php
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use Iterator;
use MongoDB\Collection;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\BulkWriteException;
use MongoDB\Operation\FindOneAndUpdate;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageConverter;
use Prooph\Common\Messaging\MessageFactory;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Metadata\MetadataMatcher;
use Prooph\EventStore\Metadata\Operator;
use Prooph\EventStore\Projection\Projection;
use Prooph\EventStore\Projection\ProjectionFactory;
use Prooph\EventStore\Projection\ProjectionOptions;
use Prooph\EventStore\Projection\Query;
use Prooph\EventStore\Projection\QueryFactory;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\Projection\ReadModelProjectionFactory;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;
final class MongoEventStore implements EventStore
{
const STREAM_COLLECTION = 'streams';
/**
* @var MongoConnection
*/
private $mongoConnection;
/**
* @var MessageFactory
*/
private $messageFactory;
/**
* @var MessageConverter
*/
private $messageConverter;
/**
* @var array
*/
private $aggregateStreamNames;
public function __construct(MongoConnection $mongoConnection, MessageFactory $messageFactory, MessageConverter $messageConverter, array $aggregateStreamNames)
{
$this->mongoConnection = $mongoConnection;
$this->messageFactory = $messageFactory;
$this->messageConverter = $messageConverter;
$this->aggregateStreamNames = $aggregateStreamNames;
}
public function fetchStreamMetadata(StreamName $streamName): array
{
$doc = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->findOne(['_id' => $streamName->toString()]);
if (!$doc) {
throw StreamNotFound::with($streamName);
}
return $doc['metadata'];
}
public function hasStream(StreamName $streamName): bool
{
return (bool)$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->count(['_id' => $streamName->toString()]);
}
public function create(Stream $stream): void
{
if (iterator_count($stream->streamEvents()) > 0) {
throw new \RuntimeException(__CLASS__ . ' does not support creating a stream and appending events in one operation.');
}
$streamDoc = ['_id' => $stream->streamName()->toString(), 'metadata' => $stream->metadata(), 'seq' => 0];
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->insertOne($streamDoc);
}
public function appendTo(StreamName $streamName, Iterator $streamEvents): void
{
if (iterator_count($streamEvents) > 1) {
throw new \RuntimeException('Due to limited ACID support you can only append one event per operation to the event stream: ' . $streamName->toString());
}
foreach($streamEvents as $event) {
$this->insertInto($streamName, $this->prepareEventData($event));
}
}
public function load(
StreamName $streamName,
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
{
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
}
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$gte' => $fromNumber];
$options = [
'sort' => ['no' => 1]
];
if ($count) {
$options['limit'] = $count;
}
$doc = $collection->findOne($query, $options);
if(!$doc) {
return new Stream($streamName, new \ArrayIterator([]));
}
$cursor = $collection->find($query, $options);
$iterator = $this->mapCursor($cursor, function (array $event) {
return $this->eventDataToMessage($event);
});
return new Stream($streamName, $iterator);
}
public function loadReverse(
StreamName $streamName,
int $fromNumber = PHP_INT_MAX,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
{
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
}
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$lte' => $fromNumber];
$options = [
'sort' => ['no' => -1]
];
if ($count) {
$options['limit'] = $count;
}
$cursor = $collection->find($query, $options);
$iterator = $this->mapCursor($cursor, function (array $event) {
return $this->eventDataToMessage($event);
});
return new Stream($streamName, $iterator);
}
public function delete(StreamName $streamName): void
{
//Note: this is not transaction save.
//However, delete should only be called for projection streams and mongodb will recreate an empty
//stream collection if it not exists. So self::hasStream can return true even if there is no stream collection
//but only the ref in the streams collection (scenario if first cmd succeed but second fails)
$this->mongoConnection->selectCollection($streamName->toString())
->drop();
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->deleteOne(['_id' => $streamName->toString()]);
}
/**
* @param Message $e
* @return array
*/
private function prepareEventData(Message $e)
{
$eventArr = $this->messageConverter->convertToArray($e);
$eventData = [
'_id' => $eventArr['uuid'],
'event_name' => $eventArr['message_name'],
'payload' => $eventArr['payload'],
'created_at' => $eventArr['created_at']->format('Y-m-d\TH:i:s.u'),
'metadata' => $eventArr['metadata']
];
return $eventData;
}
private function eventDataToMessage(array $eventData): Message
{
$createdAt = \DateTimeImmutable::createFromFormat(
'Y-m-d\TH:i:s.u',
$eventData['created_at'],
new \DateTimeZone('UTC')
);
return $this->messageFactory->createMessageFromArray($eventData['event_name'], [
'uuid' => $eventData['_id'],
'created_at' => $createdAt,
'payload' => $eventData['payload'],
'metadata' => $eventData['metadata']
]);
}
private function getCollectionByStreamName(StreamName $streamName): Collection
{
$streamName = $streamName->toString();
$collection = $this->mongoConnection->selectCollection($streamName);
$collection->createIndex([
'no' => 1
], ['unique' => true, 'name' => 'no_idx']);
if (in_array($streamName, $this->aggregateStreamNames)) {
$collection->createIndex([
'metadata._aggregate_id' => 1,
'metadata._aggregate_version' => 1
], ['unique' => true, 'name' => 'aggregate_version_idx']);
}
return $collection;
}
private function buildQuery(MetadataMatcher $matcher): array
{
$query = [];
foreach ($matcher->data() as $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
switch ($operator) {
case Operator::EQUALS:
$operator = '$eq';
break;
case Operator::GREATER_THAN:
$operator = '$gt';
break;
case Operator::GREATER_THAN_EQUALS:
$operator = '$gte';
break;
case Operator::LOWER_THAN:
$operator = '$lt';
break;
case Operator::LOWER_THAN_EQUALS:
$operator = '$lte';
break;
case Operator::NOT_EQUALS:
$operator = '$ne';
break;
}
$query['metadata.' . $field] = [$operator => $value];
}
return $query;
}
private function insertInto(StreamName $streamName, array $eventData): void
{
$streamInfo = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->findOneAndUpdate(
['_id' => $streamName->toString()],
['$inc' => ['seq' => 1]],
['returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER]
);
if(!$streamInfo) {
throw StreamNotFound::with($streamName);
}
$eventData['no'] = $streamInfo['seq'];
try {
$this->getCollectionByStreamName($streamName)->insertOne($eventData);
} catch (BulkWriteException $e) {
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->updateOne(
['_id' => $streamName->toString()],
['$inc' => ['seq' => -1]]
);
throw $e;
}
}
private function mapCursor(Cursor $cursor, callable $callback): \IteratorIterator
{
return new class($cursor, $callback) extends \IteratorIterator {
/**
* The function to be apply on all InnerIterator element
*
* @var callable
*/
private $callable;
private $currentKey;
private $currentVal;
/**
* The Constructor
*
* @param Cursor $cursor
* @param callable $callable
*/
public function __construct(Cursor $cursor, callable $callable)
{
parent::__construct($cursor);
$this->callable = $callable;
}
public function valid(): bool
{
return !$this->getInnerIterator()->isDead();
}
/**
* Get the value of the current element
*/
public function current()
{
$callback = $this->callable;
return $callback(parent::current(), parent::key());
}
};
}
public function updateStreamMetadata(StreamName $streamName, array $newMetadata): void
{
// TODO: Implement updateStreamMetadata() method.
}
public function createQuery(QueryFactory $factory = null): Query
{
// TODO: Implement createQuery() method.
}
public function createProjection(
string $name,
ProjectionOptions $options = null,
ProjectionFactory $factory = null): Projection
{
// TODO: Implement createProjection() method.
}
public function createReadModelProjection(
string $name,
ReadModel $readModel,
ProjectionOptions $options = null,
ReadModelProjectionFactory $factory = null
): ReadModelProjection
{
if (null === $options) {
$options = new ProjectionOptions();
}
return new MongoReadModelProjection(
$this,
$name,
$readModel,
$this->mongoConnection,
$options->cacheSize(),
$options->persistBlockSize()
);
}
public function getDefaultQueryFactory(): QueryFactory
{
throw new \BadMethodCallException(__METHOD__ . ' not supported');
}
public function getDefaultProjectionFactory(): ProjectionFactory
{
throw new \BadMethodCallException(__METHOD__ . ' not supported');
}
public function getDefaultReadModelProjectionFactory(): ReadModelProjectionFactory
{
throw new \BadMethodCallException(__METHOD__ . ' not supported');
}
}
<?php
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use Closure;
use Acme\Model\Message;
use MongoDB\Exception\BadMethodCallException;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\RuntimeException;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\StreamName;
final class MongoReadModelProjection implements ReadModelProjection
{
const PROJECTIONS_COLLECTION = 'projections';
const LOCK_TIMEOUT_MS = 1000;
/**
* @var string
*/
private $name;
/**
* @var EventStore
*/
private $eventStore;
/**
* @var ReadModel
*/
private $readModel;
/**
* @var MongoConnection
*/
private $mongoConnection;
/**
* @var array
*/
private $streamPositions;
/**
* @var array
*/
private $state = [];
/**
* @var callable|null
*/
private $initCallback;
/**
* @var Closure|null
*/
private $handler;
/**
* @var array
*/
private $handlers = [];
private $currentStreamName = null;
private $isStopped = false;
/**
* @var int
*/
private $eventCounter = 0;
private $cacheSize;
private $persistBlockSize;
public function __construct(
EventStore $eventStore,
string $name,
ReadModel $readModel,
MongoConnection $mongoConnection,
int $cacheSize,
int $persistBlockSize
) {
$this->eventStore = $eventStore;
$this->name = $name;
$this->readModel = $readModel;
$this->mongoConnection = $mongoConnection;
$this->cacheSize = $cacheSize;
$this->persistBlockSize = $persistBlockSize;
}
/**
* The callback has to return an array
*/
public function init(Closure $callback): ReadModelProjection
{
if (null !== $this->initCallback) {
throw new \RuntimeException('Projection already initialized');
}
$callback = Closure::bind($callback, $this->createHandlerContext($this->currentStreamName));
$result = $callback();
if (is_array($result)) {
$this->state = $result;
}
$this->initCallback = $callback;
return $this;
}
public function fromStream(string $streamName): ReadModelProjection
{
if (null !== $this->streamPositions) {
throw new \RuntimeException('From was already called');
}
$this->streamPositions = [$streamName => 0];
return $this;
}
public function fromStreams(string ...$streamNames): ReadModelProjection
{
if (null !== $this->streamPositions) {
throw new \RuntimeException('From was already called');
}
foreach ($streamNames as $streamName) {
$this->streamPositions[$streamName] = 0;
}
return $this;
}
public function when(array $handlers): ReadModelProjection
{
if (null !== $this->handler || ! empty($this->handlers)) {
throw new \RuntimeException('When was already called');
}
foreach ($handlers as $eventName => $handler) {
if (! is_string($eventName)) {
throw new \InvalidArgumentException('Invalid event name given, string expected');
}
if (! $handler instanceof Closure) {
throw new \InvalidArgumentException('Invalid handler given, Closure expected');
}
$this->handlers[$eventName] = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName));
}
return $this;
}
public function whenAny(Closure $handler): ReadModelProjection
{
if (null !== $this->handler || ! empty($this->handlers)) {
throw new \RuntimeException('When was already called');
}
$this->handler = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName));
return $this;
}
public function stop(): void
{
$this->isStopped = true;
}
public function getState(): array
{
return $this->state;
}
public function getName(): string
{
return $this->name;
}
public function delete(bool $deleteProjection): void
{
if($deleteProjection) {
$this->readModel->delete();
}
}
public function readModel(): ReadModel
{
return $this->readModel;
}
public function run(bool $keepRunning = true, ?int $usleep = 100): void
{
$this->createProjectionIfNotExist();
$this->acquireLock();
try {
do {
$this->load();
$singleHandler = null !== $this->handler;
foreach ($this->streamPositions as $streamName => $position) {
try {
$stream = $this->eventStore->load(new StreamName($streamName), $position + 1);
} catch (StreamNotFound $e) {
// no newer events found
continue;
}
if ($singleHandler) {
$this->handleStreamWithSingleHandler($streamName, $stream->streamEvents());
} else {
$this->handleStreamWithHandlers($streamName, $stream->streamEvents());
}
if ($this->isStopped) {
break;
}
}
if (0 === $this->eventCounter) {
if (null !== $usleep) {
usleep($usleep);
}
} else {
$this->persist();
}
$this->eventCounter = 0;
} while ($keepRunning && ! $this->isStopped);
} finally {
$this->releaseLock();
}
}
protected function createProjectionIfNotExist(): void
{
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
try {
$col->insertOne([
'_id' => $this->name,
'position' => [],
'state' => [],
'locked_until' => null
]);
} catch (\Throwable $ex) {
//ignore errors especially duplicate key errors
}
}
protected function load(): void
{
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
$doc = $col->findOne(['_id' => $this->name]);
if (!$doc) {
throw new \RuntimeException('Projection information for ' . $this->name . ' missing in collection ' . self::PROJECTIONS_COLLECTION);
}
$this->streamPositions = $doc['position'];
$state = $doc['state'];
if (! empty($state)) {
$this->state = $state;
}
}
protected function persist(): void
{
$this->readModel()->persist();
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u');
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
$col->updateOne([
'_id' => $this->name
], [
'$set' => [
'position' => $this->streamPositions,
'state' => $this->state,
'locked_until' => $lockUntilString
]
]);
}
public function reset(): void
{
$this->createProjectionIfNotExist();
if (null !== $this->streamPositions) {
$this->streamPositions = array_map(
function (): int {
return 0;
},
$this->streamPositions
);
}
$this->isStopped = false;
$callback = $this->initCallback;
if (is_callable($callback)) {
$result = $callback();
if (is_array($result)) {
$this->state = $result;
}
} else {
$this->state = [];
}
$this->eventStore->delete(new StreamName($this->name));
$this->readModel->reset();
$this->acquireLock();
$this->persist();
$this->releaseLock();
}
/**
* @throws RuntimeException
*/
protected function acquireLock(): void
{
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$nowString = $now->format('Y-m-d\TH:i:s.u');
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u');
$result = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([
'_id' => $this->name,
'$or' => [
[
'locked_until' => null,
],
[
'locked_until' => ['$lt' => $nowString]
]
]
], [
'$set' => [
'locked_until' => $lockUntilString
]
]);
if ($result->getMatchedCount() !== 1) {
throw new RuntimeException('Another projection process is already running');
}
}
protected function releaseLock(): void
{
$this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([
'_id' => $this->name,
], [
'$set' => [
'locked_until' => null
]
]);
}
public function fromCategory(string $name): ReadModelProjection
{
throw new BadMethodCallException(__METHOD__ . ' not supported');
}
public function fromCategories(string ...$names): ReadModelProjection
{
throw new BadMethodCallException(__METHOD__ . ' not supported');
}
public function fromAll(): ReadModelProjection
{
throw new BadMethodCallException(__METHOD__ . ' not supported');
}
private function handleStreamWithSingleHandler(string $streamName, \Iterator $events): void
{
$this->currentStreamName = $streamName;
$handler = $this->handler;
foreach ($events as $event) {
/* @var Message $event */
$this->streamPositions[$streamName]++;
$this->eventCounter++;
$result = $handler($this->state, $event);
if (is_array($result)) {
$this->state = $result;
}
if ($this->eventCounter === $this->persistBlockSize) {
$this->persist();
$this->eventCounter = 0;
}
if ($this->isStopped) {
break;
}
}
}
private function handleStreamWithHandlers(string $streamName, \Iterator $events): void
{
$this->currentStreamName = $streamName;
foreach ($events as $event) {
/* @var Message $event */
$this->streamPositions[$streamName]++;
$this->eventCounter++;
if (! isset($this->handlers[$event->messageName()])) {
continue;
}
$handler = $this->handlers[$event->messageName()];
$result = $handler($this->state, $event);
if (is_array($result)) {
$this->state = $result;
}
if ($this->eventCounter === $this->persistBlockSize) {
$this->persist();
$this->eventCounter = 0;
}
if ($this->isStopped) {
break;
}
}
}
private function createHandlerContext(?string &$streamName)
{
return new class($this, $streamName) {
/**
* @var ReadModelProjection
*/
private $projection;
/**
* @var ?string
*/
private $streamName;
public function __construct(ReadModelProjection $projection, ?string &$streamName)
{
$this->projection = $projection;
$this->streamName = &$streamName;
}
public function stop(): void
{
$this->projection->stop();
}
public function readModel(): ReadModel
{
return $this->projection->readModel();
}
public function streamName(): ?string
{
return $this->streamName;
}
};
}
}
@codeliner
Copy link
Author

Note: This is an example of a MongoDb event store implementing the Prooph\EventStore v7 basic interface.
Due t limited ACID support we need to use some "dirty" hacks and accept some limitations.

Limitations:

  • You cannot create a new stream and append events to it in one operation.
  • You cannot append more than one event to a stream in one operation.

Dirty:

  • MongoEventStore::insertInto maintains a sequence per stream but is not transaction-safe
  • MongoEventStore::delete is not transaction-safe

When and why should you use the MongoEventStore

  • If you <3 MongoDb for projections, want to use a single database for your system and you can live with the tradeoffs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment