Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
prooph MongoEventStore v7
<?php
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use MongoDB\Client;
use MongoDB\Collection;
class MongoConnection
{
/**
* @var Client
*/
private $client;
/**
* @var string
*/
private $dbName;
public function __construct(Client $client, string $dbName)
{
$this->client = $client;
$this->dbName = $dbName;
}
public function client(): Client
{
return $this->client;
}
public function dbName(): string
{
return $this->dbName;
}
public function selectCollection(string $collectionName, array $options = []): Collection
{
return $this->client->selectCollection($this->dbName, $collectionName, $options);
}
public function replaceCollection(string $collectionName, string $withCollection)
{
$adminDb = $this->client->admin;
$cursor = $adminDb->command([
'renameCollection' => $this->dbName . '.' . $withCollection,
'to' => $this->dbName . '.' . $collectionName,
'dropTarget' => true
]);
return current($cursor->toArray());
}
}
<?php
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use Iterator;
use MongoDB\Collection;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\BulkWriteException;
use MongoDB\Operation\FindOneAndUpdate;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageConverter;
use Prooph\Common\Messaging\MessageFactory;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Metadata\MetadataMatcher;
use Prooph\EventStore\Metadata\Operator;
use Prooph\EventStore\Projection\Projection;
use Prooph\EventStore\Projection\ProjectionFactory;
use Prooph\EventStore\Projection\ProjectionOptions;
use Prooph\EventStore\Projection\Query;
use Prooph\EventStore\Projection\QueryFactory;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\Projection\ReadModelProjectionFactory;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;
final class MongoEventStore implements EventStore
{
const STREAM_COLLECTION = 'streams';
/**
* @var MongoConnection
*/
private $mongoConnection;
/**
* @var MessageFactory
*/
private $messageFactory;
/**
* @var MessageConverter
*/
private $messageConverter;
/**
* @var array
*/
private $aggregateStreamNames;
public function __construct(MongoConnection $mongoConnection, MessageFactory $messageFactory, MessageConverter $messageConverter, array $aggregateStreamNames)
{
$this->mongoConnection = $mongoConnection;
$this->messageFactory = $messageFactory;
$this->messageConverter = $messageConverter;
$this->aggregateStreamNames = $aggregateStreamNames;
}
public function fetchStreamMetadata(StreamName $streamName): array
{
$doc = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->findOne(['_id' => $streamName->toString()]);
if (!$doc) {
throw StreamNotFound::with($streamName);
}
return $doc['metadata'];
}
public function hasStream(StreamName $streamName): bool
{
return (bool)$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->count(['_id' => $streamName->toString()]);
}
public function create(Stream $stream): void
{
if (iterator_count($stream->streamEvents()) > 0) {
throw new \RuntimeException(__CLASS__ . ' does not support creating a stream and appending events in one operation.');
}
$streamDoc = ['_id' => $stream->streamName()->toString(), 'metadata' => $stream->metadata(), 'seq' => 0];
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->insertOne($streamDoc);
}
public function appendTo(StreamName $streamName, Iterator $streamEvents): void
{
if (iterator_count($streamEvents) > 1) {
throw new \RuntimeException('Due to limited ACID support you can only append one event per operation to the event stream: ' . $streamName->toString());
}
foreach($streamEvents as $event) {
$this->insertInto($streamName, $this->prepareEventData($event));
}
}
public function load(
StreamName $streamName,
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
{
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
}
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$gte' => $fromNumber];
$options = [
'sort' => ['no' => 1]
];
if ($count) {
$options['limit'] = $count;
}
$doc = $collection->findOne($query, $options);
if(!$doc) {
return new Stream($streamName, new \ArrayIterator([]));
}
$cursor = $collection->find($query, $options);
$iterator = $this->mapCursor($cursor, function (array $event) {
return $this->eventDataToMessage($event);
});
return new Stream($streamName, $iterator);
}
public function loadReverse(
StreamName $streamName,
int $fromNumber = PHP_INT_MAX,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
{
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
}
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$lte' => $fromNumber];
$options = [
'sort' => ['no' => -1]
];
if ($count) {
$options['limit'] = $count;
}
$cursor = $collection->find($query, $options);
$iterator = $this->mapCursor($cursor, function (array $event) {
return $this->eventDataToMessage($event);
});
return new Stream($streamName, $iterator);
}
public function delete(StreamName $streamName): void
{
//Note: this is not transaction save.
//However, delete should only be called for projection streams and mongodb will recreate an empty
//stream collection if it not exists. So self::hasStream can return true even if there is no stream collection
//but only the ref in the streams collection (scenario if first cmd succeed but second fails)
$this->mongoConnection->selectCollection($streamName->toString())
->drop();
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->deleteOne(['_id' => $streamName->toString()]);
}
/**
* @param Message $e
* @return array
*/
private function prepareEventData(Message $e)
{
$eventArr = $this->messageConverter->convertToArray($e);
$eventData = [
'_id' => $eventArr['uuid'],
'event_name' => $eventArr['message_name'],
'payload' => $eventArr['payload'],
'created_at' => $eventArr['created_at']->format('Y-m-d\TH:i:s.u'),
'metadata' => $eventArr['metadata']
];
return $eventData;
}
private function eventDataToMessage(array $eventData): Message
{
$createdAt = \DateTimeImmutable::createFromFormat(
'Y-m-d\TH:i:s.u',
$eventData['created_at'],
new \DateTimeZone('UTC')
);
return $this->messageFactory->createMessageFromArray($eventData['event_name'], [
'uuid' => $eventData['_id'],
'created_at' => $createdAt,
'payload' => $eventData['payload'],
'metadata' => $eventData['metadata']
]);
}
private function getCollectionByStreamName(StreamName $streamName): Collection
{
$streamName = $streamName->toString();
$collection = $this->mongoConnection->selectCollection($streamName);
$collection->createIndex([
'no' => 1
], ['unique' => true, 'name' => 'no_idx']);
if (in_array($streamName, $this->aggregateStreamNames)) {
$collection->createIndex([
'metadata._aggregate_id' => 1,
'metadata._aggregate_version' => 1
], ['unique' => true, 'name' => 'aggregate_version_idx']);
}
return $collection;
}
private function buildQuery(MetadataMatcher $matcher): array
{
$query = [];
foreach ($matcher->data() as $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
switch ($operator) {
case Operator::EQUALS:
$operator = '$eq';
break;
case Operator::GREATER_THAN:
$operator = '$gt';
break;
case Operator::GREATER_THAN_EQUALS:
$operator = '$gte';
break;
case Operator::LOWER_THAN:
$operator = '$lt';
break;
case Operator::LOWER_THAN_EQUALS:
$operator = '$lte';
break;
case Operator::NOT_EQUALS:
$operator = '$ne';
break;
}
$query['metadata.' . $field] = [$operator => $value];
}
return $query;
}
private function insertInto(StreamName $streamName, array $eventData): void
{
$streamInfo = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->findOneAndUpdate(
['_id' => $streamName->toString()],
['$inc' => ['seq' => 1]],
['returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER]
);
if(!$streamInfo) {
throw StreamNotFound::with($streamName);
}
$eventData['no'] = $streamInfo['seq'];
try {
$this->getCollectionByStreamName($streamName)->insertOne($eventData);
} catch (BulkWriteException $e) {
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->updateOne(
['_id' => $streamName->toString()],
['$inc' => ['seq' => -1]]
);
throw $e;
}
}
private function mapCursor(Cursor $cursor, callable $callback): \IteratorIterator
{
return new class($cursor, $callback) extends \IteratorIterator {
/**
* The function to be apply on all InnerIterator element
*
* @var callable
*/
private $callable;
private $currentKey;
private $currentVal;
/**
* The Constructor
*
* @param Cursor $cursor
* @param callable $callable
*/
public function __construct(Cursor $cursor, callable $callable)
{
parent::__construct($cursor);
$this->callable = $callable;
}
public function valid(): bool
{
return !$this->getInnerIterator()->isDead();
}
/**
* Get the value of the current element
*/
public function current()
{
$callback = $this->callable;
return $callback(parent::current(), parent::key());
}
};
}
public function updateStreamMetadata(StreamName $streamName, array $newMetadata): void
{
// TODO: Implement updateStreamMetadata() method.
}
public function createQuery(QueryFactory $factory = null): Query
{
// TODO: Implement createQuery() method.
}
public function createProjection(
string $name,
ProjectionOptions $options = null,
ProjectionFactory $factory = null): Projection
{
// TODO: Implement createProjection() method.
}
public function createReadModelProjection(
string $name,
ReadModel $readModel,
ProjectionOptions $options = null,
ReadModelProjectionFactory $factory = null
): ReadModelProjection
{
if (null === $options) {
$options = new ProjectionOptions();
}
return new MongoReadModelProjection(
$this,
$name,
$readModel,
$this->mongoConnection,
$options->cacheSize(),
$options->persistBlockSize()
);
}
public function getDefaultQueryFactory(): QueryFactory
{
throw new \BadMethodCallException(__METHOD__ . ' not supported');
}
public function getDefaultProjectionFactory(): ProjectionFactory
{
throw new \BadMethodCallException(__METHOD__ . ' not supported');
}
public function getDefaultReadModelProjectionFactory(): ReadModelProjectionFactory
{
throw new \BadMethodCallException(__METHOD__ . ' not supported');
}
}
<?php
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use Closure;
use Acme\Model\Message;
use MongoDB\Exception\BadMethodCallException;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\RuntimeException;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\StreamName;
final class MongoReadModelProjection implements ReadModelProjection
{
const PROJECTIONS_COLLECTION = 'projections';
const LOCK_TIMEOUT_MS = 1000;
/**
* @var string
*/
private $name;
/**
* @var EventStore
*/
private $eventStore;
/**
* @var ReadModel
*/
private $readModel;
/**
* @var MongoConnection
*/
private $mongoConnection;
/**
* @var array
*/
private $streamPositions;
/**
* @var array
*/
private $state = [];
/**
* @var callable|null
*/
private $initCallback;
/**
* @var Closure|null
*/
private $handler;
/**
* @var array
*/
private $handlers = [];
private $currentStreamName = null;
private $isStopped = false;
/**
* @var int
*/
private $eventCounter = 0;
private $cacheSize;
private $persistBlockSize;
public function __construct(
EventStore $eventStore,
string $name,
ReadModel $readModel,
MongoConnection $mongoConnection,
int $cacheSize,
int $persistBlockSize
) {
$this->eventStore = $eventStore;
$this->name = $name;
$this->readModel = $readModel;
$this->mongoConnection = $mongoConnection;
$this->cacheSize = $cacheSize;
$this->persistBlockSize = $persistBlockSize;
}
/**
* The callback has to return an array
*/
public function init(Closure $callback): ReadModelProjection
{
if (null !== $this->initCallback) {
throw new \RuntimeException('Projection already initialized');
}
$callback = Closure::bind($callback, $this->createHandlerContext($this->currentStreamName));
$result = $callback();
if (is_array($result)) {
$this->state = $result;
}
$this->initCallback = $callback;
return $this;
}
public function fromStream(string $streamName): ReadModelProjection
{
if (null !== $this->streamPositions) {
throw new \RuntimeException('From was already called');
}
$this->streamPositions = [$streamName => 0];
return $this;
}
public function fromStreams(string ...$streamNames): ReadModelProjection
{
if (null !== $this->streamPositions) {
throw new \RuntimeException('From was already called');
}
foreach ($streamNames as $streamName) {
$this->streamPositions[$streamName] = 0;
}
return $this;
}
public function when(array $handlers): ReadModelProjection
{
if (null !== $this->handler || ! empty($this->handlers)) {
throw new \RuntimeException('When was already called');
}
foreach ($handlers as $eventName => $handler) {
if (! is_string($eventName)) {
throw new \InvalidArgumentException('Invalid event name given, string expected');
}
if (! $handler instanceof Closure) {
throw new \InvalidArgumentException('Invalid handler given, Closure expected');
}
$this->handlers[$eventName] = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName));
}
return $this;
}
public function whenAny(Closure $handler): ReadModelProjection
{
if (null !== $this->handler || ! empty($this->handlers)) {
throw new \RuntimeException('When was already called');
}
$this->handler = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName));
return $this;
}
public function stop(): void
{
$this->isStopped = true;
}
public function getState(): array
{
return $this->state;
}
public function getName(): string
{
return $this->name;
}
public function delete(bool $deleteProjection): void
{
if($deleteProjection) {
$this->readModel->delete();
}
}
public function readModel(): ReadModel
{
return $this->readModel;
}
public function run(bool $keepRunning = true, ?int $usleep = 100): void
{
$this->createProjectionIfNotExist();
$this->acquireLock();
try {
do {
$this->load();
$singleHandler = null !== $this->handler;
foreach ($this->streamPositions as $streamName => $position) {
try {
$stream = $this->eventStore->load(new StreamName($streamName), $position + 1);
} catch (StreamNotFound $e) {
// no newer events found
continue;
}
if ($singleHandler) {
$this->handleStreamWithSingleHandler($streamName, $stream->streamEvents());
} else {
$this->handleStreamWithHandlers($streamName, $stream->streamEvents());
}
if ($this->isStopped) {
break;
}
}
if (0 === $this->eventCounter) {
if (null !== $usleep) {
usleep($usleep);
}
} else {
$this->persist();
}
$this->eventCounter = 0;
} while ($keepRunning && ! $this->isStopped);
} finally {
$this->releaseLock();
}
}
protected function createProjectionIfNotExist(): void
{
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
try {
$col->insertOne([
'_id' => $this->name,
'position' => [],
'state' => [],
'locked_until' => null
]);
} catch (\Throwable $ex) {
//ignore errors especially duplicate key errors
}
}
protected function load(): void
{
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
$doc = $col->findOne(['_id' => $this->name]);
if (!$doc) {
throw new \RuntimeException('Projection information for ' . $this->name . ' missing in collection ' . self::PROJECTIONS_COLLECTION);
}
$this->streamPositions = $doc['position'];
$state = $doc['state'];
if (! empty($state)) {
$this->state = $state;
}
}
protected function persist(): void
{
$this->readModel()->persist();
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u');
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
$col->updateOne([
'_id' => $this->name
], [
'$set' => [
'position' => $this->streamPositions,
'state' => $this->state,
'locked_until' => $lockUntilString
]
]);
}
public function reset(): void
{
$this->createProjectionIfNotExist();
if (null !== $this->streamPositions) {
$this->streamPositions = array_map(
function (): int {
return 0;
},
$this->streamPositions
);
}
$this->isStopped = false;
$callback = $this->initCallback;
if (is_callable($callback)) {
$result = $callback();
if (is_array($result)) {
$this->state = $result;
}
} else {
$this->state = [];
}
$this->eventStore->delete(new StreamName($this->name));
$this->readModel->reset();
$this->acquireLock();
$this->persist();
$this->releaseLock();
}
/**
* @throws RuntimeException
*/
protected function acquireLock(): void
{
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$nowString = $now->format('Y-m-d\TH:i:s.u');
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u');
$result = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([
'_id' => $this->name,
'$or' => [
[
'locked_until' => null,
],
[
'locked_until' => ['$lt' => $nowString]
]
]
], [
'$set' => [
'locked_until' => $lockUntilString
]
]);
if ($result->getMatchedCount() !== 1) {
throw new RuntimeException('Another projection process is already running');
}
}
protected function releaseLock(): void
{
$this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([
'_id' => $this->name,
], [
'$set' => [
'locked_until' => null
]
]);
}
public function fromCategory(string $name): ReadModelProjection
{
throw new BadMethodCallException(__METHOD__ . ' not supported');
}
public function fromCategories(string ...$names): ReadModelProjection
{
throw new BadMethodCallException(__METHOD__ . ' not supported');
}
public function fromAll(): ReadModelProjection
{
throw new BadMethodCallException(__METHOD__ . ' not supported');
}
private function handleStreamWithSingleHandler(string $streamName, \Iterator $events): void
{
$this->currentStreamName = $streamName;
$handler = $this->handler;
foreach ($events as $event) {
/* @var Message $event */
$this->streamPositions[$streamName]++;
$this->eventCounter++;
$result = $handler($this->state, $event);
if (is_array($result)) {
$this->state = $result;
}
if ($this->eventCounter === $this->persistBlockSize) {
$this->persist();
$this->eventCounter = 0;
}
if ($this->isStopped) {
break;
}
}
}
private function handleStreamWithHandlers(string $streamName, \Iterator $events): void
{
$this->currentStreamName = $streamName;
foreach ($events as $event) {
/* @var Message $event */
$this->streamPositions[$streamName]++;
$this->eventCounter++;
if (! isset($this->handlers[$event->messageName()])) {
continue;
}
$handler = $this->handlers[$event->messageName()];
$result = $handler($this->state, $event);
if (is_array($result)) {
$this->state = $result;
}
if ($this->eventCounter === $this->persistBlockSize) {
$this->persist();
$this->eventCounter = 0;
}
if ($this->isStopped) {
break;
}
}
}
private function createHandlerContext(?string &$streamName)
{
return new class($this, $streamName) {
/**
* @var ReadModelProjection
*/
private $projection;
/**
* @var ?string
*/
private $streamName;
public function __construct(ReadModelProjection $projection, ?string &$streamName)
{
$this->projection = $projection;
$this->streamName = &$streamName;
}
public function stop(): void
{
$this->projection->stop();
}
public function readModel(): ReadModel
{
return $this->projection->readModel();
}
public function streamName(): ?string
{
return $this->streamName;
}
};
}
}
@codeliner

This comment has been minimized.

Copy link
Owner Author

commented Dec 8, 2016

Note: This is an example of a MongoDb event store implementing the Prooph\EventStore v7 basic interface.
Due t limited ACID support we need to use some "dirty" hacks and accept some limitations.

Limitations:

  • You cannot create a new stream and append events to it in one operation.
  • You cannot append more than one event to a stream in one operation.

Dirty:

  • MongoEventStore::insertInto maintains a sequence per stream but is not transaction-safe
  • MongoEventStore::delete is not transaction-safe

When and why should you use the MongoEventStore

  • If you <3 MongoDb for projections, want to use a single database for your system and you can live with the tradeoffs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.