Skip to content

Instantly share code, notes, and snippets.

@codeliner
Last active February 14, 2016 17:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codeliner/b869bd43445cb06f0d10 to your computer and use it in GitHub Desktop.
Save codeliner/b869bd43445cb06f0d10 to your computer and use it in GitHub Desktop.
An example of a decorated Prooph\EventStore\Adapter\DoctrineEventStoreAdapter that uses a shared database connection between write and read model. This can be useful if you want to avoid eventual consistency issues (only possible if same database is used for write/read).
<?php
namespace App\Infrastructure\EventStore;
use DateTimeInterface;
use Iterator;
use Prooph\EventStore\Adapter\Adapter;
use Prooph\EventStore\Adapter\Doctrine\DoctrineEventStoreAdapter;
use Prooph\EventStore\Exception\StreamNotFoundException;
use Prooph\EventStore\Stream\Stream;
use Prooph\EventStore\Stream\StreamName;
/**
* Class DoctrineSharedConnectionAdapter
* Use a shared Doctrine\DBAL\Connection and don't forward transaction handling to it as the transaction is
* handled elsewhere.
*
* @package App\Infrastructure\EventStore
*/
final class DoctrineSharedConnectionAdapter implements Adapter
{
/**
* @var DoctrineEventStoreAdapter
*/
private $doctrineEventStoreAdapter;
public function __construct(DoctrineEventStoreAdapter $adapter)
{
$this->doctrineEventStoreAdapter = $adapter;
}
/**
* @param Stream $stream
* @return void
*/
public function create(Stream $stream)
{
$this->doctrineEventStoreAdapter->create($stream);
}
/**
* @param StreamName $streamName
* @param Iterator $domainEvents
* @throws StreamNotFoundException If stream does not exist
* @return void
*/
public function appendTo(StreamName $streamName, Iterator $domainEvents)
{
$this->doctrineEventStoreAdapter->appendTo($streamName, $domainEvents);
}
/**
* @param StreamName $streamName
* @param null|int $minVersion Minimum version an event should have
* @return Stream|null
*/
public function load(StreamName $streamName, $minVersion = null)
{
return $this->doctrineEventStoreAdapter->load($streamName, $minVersion);
}
/**
* @param StreamName $streamName
* @param array $metadata If empty array is provided, then all events should be returned
* @param null|int $minVersion Minimum version an event should have
* @return Iterator
*/
public function loadEvents(StreamName $streamName, array $metadata = [], $minVersion = null)
{
return $this->doctrineEventStoreAdapter->loadEvents($streamName, $metadata, $minVersion);
}
/**
* @param StreamName $streamName
* @param DateTimeInterface|null $since
* @param array $metadata
* @return Iterator
*/
public function replay(StreamName $streamName, DateTimeInterface $since = null, array $metadata = [])
{
return $this->doctrineEventStoreAdapter->replay($streamName, $since, $metadata);
}
}
namespace App\Infrastructure\EventStore;
use Interop\Container\ContainerInterface;
use Prooph\EventStore\Adapter\Doctrine\DoctrineEventStoreAdapter;
final class DoctrineSharedConnectionAdapterFactory
{
public function __invoke(ContainerInterface $container)
{
return new DoctrineSharedConnectionAdapter($container->get(DoctrineEventStoreAdapter::class));
}
}
<?php
return [
'prooph' => [
'event_store' => [
'plugins' => [
],
'adapter' => [
//Drop in replacement for the normal DoctrineEventStoreAdapter
'type' => \App\Infrastructure\EventStore\DoctrineSharedConnectionAdapter::class,
'options' => [
//Shared doctrine connection passed to DoctrineEventStoreAdapter as well as used to populate read model
'connection_alias' => 'doctrine.connection.default',
],
],
'user_collection' => [
'repository_class' => \App\Infrastructure\Repository\EventStoreUserCollection::class,
'aggregate_type' => \App\Model\User\User::class,
'aggregate_translator' => \Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator::class
],
]
]
];
<?php
namespace App\Http;
use App\Infrastructure\Repository\EventStoreUserCollection;
use App\Model\User\User;
use Doctrine\DBAL\Connection;
use Prooph\Common\Event\ActionEvent;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Zend\Diactoros\Response\TextResponse;
final class RegisterUser
{
/**
* @var EventStoreUserCollection
*/
private $userCollection;
/**
* @var Connection
*/
private $connection;
public function __construct(EventStoreUserCollection $userCollection, Connection $connection) {
$this->userCollection = $userCollection;
$this->connection = $connection;
}
public function __invoke(RequestInterface $request, ResponseInterface $response, callable $next)
{
$eventStore = $this->userCollection->getEventStore();
//Emulate transaction manager, open both: database and event store transaction as they are no longer the same
$this->connection->beginTransaction();
$eventStore->beginTransaction();
try {
$user = User::register('1', 'John Doe');
//Normal usage of prooh/event-store repository
$this->userCollection->add($user);
$userWasRegistered = null;
//Fake a read model projector
$eventStore->getActionEventEmitter()->attachListener('commit.post', function(ActionEvent $event) use (&$userWasRegistered) {
$recordedEvents = iterator_to_array($event->getParam('recordedEvents'));
$userWasRegistered = $recordedEvents[0];
});
//First commit event store transaction so that "commit.post" gets triggered
$eventStore->commit();
//Our fake projector has received the event, we can populuate the read model
$this->connection->insert('users', ['id' => $userWasRegistered->userId(), 'name' => $userWasRegistered->username()]);
//Now commit database transaction to persist both: UserWasRegistered event in "event_stream" and user data in the table "users"
$this->connection->commit();
} catch (\Exception $e) {
//Only roll back database transaction, event-store roll back is not possible as it is not needed and would throw an exception
$this->connection->rollBack();
throw $e;
}
return new TextResponse('it works');
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment