-
-
Save webdevilopers/5903d2c217aea5fbe31a3b86e13ee9da to your computer and use it in GitHub Desktop.
<?php | |
namespace Acme\Context\Infrastructure\Prooph; | |
use Prooph\Common\Event\ActionEvent; | |
use Prooph\EventStore\ActionEventEmitterEventStore; | |
use Prooph\EventStore\EventStore; | |
use Prooph\EventStore\Plugin\AbstractPlugin; | |
use Prooph\EventStore\TransactionalActionEventEmitterEventStore; | |
use Symfony\Component\Messenger\MessageBusInterface; | |
final class EventPublisher extends AbstractPlugin | |
{ | |
/** | |
* @var MessageBusInterface | |
*/ | |
private $eventBus; | |
/** | |
* @var \Iterator[] | |
*/ | |
private $cachedEventStreams = []; | |
public function __construct(MessageBusInterface $eventBus) | |
{ | |
$this->eventBus = $eventBus; | |
} | |
public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void | |
{ | |
$this->listenerHandlers[] = $eventStore->attach( | |
ActionEventEmitterEventStore::EVENT_APPEND_TO, | |
function (ActionEvent $event) use ($eventStore): void { | |
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator()); | |
if (! $this->inTransaction($eventStore)) { | |
if ($event->getParam('streamNotFound', false) | |
|| $event->getParam('concurrencyException', false) | |
) { | |
return; | |
} | |
foreach ($recordedEvents as $recordedEvent) { | |
$this->eventBus->dispatch($recordedEvent); | |
} | |
} else { | |
$this->cachedEventStreams[] = $recordedEvents; | |
} | |
} | |
); | |
$this->listenerHandlers[] = $eventStore->attach( | |
ActionEventEmitterEventStore::EVENT_CREATE, | |
function (ActionEvent $event) use ($eventStore): void { | |
$stream = $event->getParam('stream'); | |
$recordedEvents = $stream->streamEvents(); | |
if (! $this->inTransaction($eventStore)) { | |
if ($event->getParam('streamExistsAlready', false)) { | |
return; | |
} | |
foreach ($recordedEvents as $recordedEvent) { | |
$this->eventBus->dispatch($recordedEvent); | |
} | |
} else { | |
$this->cachedEventStreams[] = $recordedEvents; | |
} | |
} | |
); | |
if ($eventStore instanceof TransactionalActionEventEmitterEventStore) { | |
$this->listenerHandlers[] = $eventStore->attach( | |
TransactionalActionEventEmitterEventStore::EVENT_COMMIT, | |
function (ActionEvent $event): void { | |
foreach ($this->cachedEventStreams as $stream) { | |
foreach ($stream as $recordedEvent) { | |
$this->eventBus->dispatch($recordedEvent); | |
} | |
} | |
$this->cachedEventStreams = []; | |
} | |
); | |
$this->listenerHandlers[] = $eventStore->attach( | |
TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK, | |
function (ActionEvent $event): void { | |
$this->cachedEventStreams = []; | |
} | |
); | |
} | |
} | |
private function inTransaction(EventStore $eventStore): bool | |
{ | |
return $eventStore instanceof TransactionalActionEventEmitterEventStore | |
&& $eventStore->inTransaction(); | |
} | |
} |
services: | |
_defaults: | |
public: false | |
Prooph\EventStoreBusBridge\EventPublisher: | |
class: Acme\Context\Infrastructure\Prooph\EventPublisher | |
arguments: | |
#- '@prooph_service_bus.default_event_bus' | |
- '@messenger.bus.events' | |
tags: | |
- { name: 'prooph_event_store.default_store.plugin' } |
Thanks @gquemener !
If I understand the event publisher I use with the symfony event bus here correctly then it guarantees the same, right?
And it is not coupled to the Doctrine middleware since the configuration is appended to the prooph event stores:
# config/packages/prooph_event_store_bus_bridge.yaml
services:
_defaults:
public: false
Prooph\EventStoreBusBridge\EventPublisher:
class: Acme\Context\Infrastructure\Prooph\EventPublisher
arguments:
#- '@prooph_service_bus.default_event_bus'
- '@messenger.bus.events'
tags:
- { name: 'prooph_event_store.default_store.plugin' }
Default PDO configuration for prooph:
# config/packages/prooph_pdo_event_store.yaml
services:
_defaults:
public: false
Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator: null
Prooph\EventStore\EventStore: '@app.event_store.default'
app.event_store.default:
class: Prooph\EventStore\Pdo\MariaDbEventStore
arguments:
- '@prooph_event_store.message_factory'
- '@app.event_store.pdo_connection.mysql'
- '@app.event_store.mysql.persistence_strategy'
app.event_store.pdo_connection.mysql:
class: \PDO
arguments:
- '%env(MYSQL_DSN)%'
- '%env(MYSQL_USER)%'
- '%env(MYSQL_PASSWORD)%'
app.event_store.mysql.persistence_strategy:
class: Prooph\EventStore\Pdo\PersistenceStrategy\MariaDbSingleStreamStrategy
For further reading:
If I understand the event publisher I use with the symfony event bus here correctly then it guarantees the same, right?
That's correct.
However, just as I'm coupling my event publisher to doctrine, you're coupling yours to the Prooph EventStore (because it is a plugin).
That's an acceptable tradeoff, but the event dispatching could as well happen in your repository, after you've called the appropriate methods on your storage mecanism (an EventStore, a Doctrine EntityManager, ...), bref any moment after you have the guarantee that the AR has been synchronized with the db:
public TodoRepository implements Repository
{
public function save(Todo $todo): void
{
$this->storage->persist($todo);
foreach ($todo->popEvents() as $event) {
$this->eventBus->dispatch($event);
}
}
You could even use some db mecanism that triggers events on successfull write (binlog, NOTIFY, ...), anything that is a good tradeoff between maintainability and guarantee that the event is emitted when the initial fact has been somehow persisted.
Thanks for the feedback. Indeed there is coupling to any "third-party" here. But I think that there are a lot prooph event-store users out there that would love to switch to the messenger and use there current config. And finally replace the deprecated service-bus-bridge
.
I will try to move my event publisher to a plugin and see if it works. Then we could propose it to other users.
Maybe @weaverryan thinks this is a good enhancement for the messenger+prooph users.
The middleware looks somehow like this:
The AR collector is a doctrine listener (postPersist, postLoad, postUpdate and postRemove) that keeps a reference to any AR going through the entity manager.
This solution is not ideal because it is tight to the command bus (events are dispatched after the command has been handled), whereas I realize it should be tight to the AR repository (or the underlying storage, like the EventPublisher is doing), but for my current project this is an acceptable trade-off.
The main concern is to have the guarantee that the events are dispatched when, and only when, db transaction has been committed.