Skip to content

Instantly share code, notes, and snippets.

@webdevilopers
Created June 4, 2020 09:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save webdevilopers/5903d2c217aea5fbe31a3b86e13ee9da to your computer and use it in GitHub Desktop.
Save webdevilopers/5903d2c217aea5fbe31a3b86e13ee9da to your computer and use it in GitHub Desktop.
Replacing Prooph Service Event Bus with Symfony Messenger using custom Event Publisher
<?php
namespace Acme\Context\Infrastructure\Prooph;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
use Symfony\Component\Messenger\MessageBusInterface;
final class EventPublisher extends AbstractPlugin
{
/**
* @var MessageBusInterface
*/
private $eventBus;
/**
* @var \Iterator[]
*/
private $cachedEventStreams = [];
public function __construct(MessageBusInterface $eventBus)
{
$this->eventBus = $eventBus;
}
public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
{
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_APPEND_TO,
function (ActionEvent $event) use ($eventStore): void {
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamNotFound', false)
|| $event->getParam('concurrencyException', false)
) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_CREATE,
function (ActionEvent $event) use ($eventStore): void {
$stream = $event->getParam('stream');
$recordedEvents = $stream->streamEvents();
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamExistsAlready', false)) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
if ($eventStore instanceof TransactionalActionEventEmitterEventStore) {
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
function (ActionEvent $event): void {
foreach ($this->cachedEventStreams as $stream) {
foreach ($stream as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
}
$this->cachedEventStreams = [];
}
);
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK,
function (ActionEvent $event): void {
$this->cachedEventStreams = [];
}
);
}
}
private function inTransaction(EventStore $eventStore): bool
{
return $eventStore instanceof TransactionalActionEventEmitterEventStore
&& $eventStore->inTransaction();
}
}
services:
_defaults:
public: false
Prooph\EventStoreBusBridge\EventPublisher:
class: Acme\Context\Infrastructure\Prooph\EventPublisher
arguments:
#- '@prooph_service_bus.default_event_bus'
- '@messenger.bus.events'
tags:
- { name: 'prooph_event_store.default_store.plugin' }
@webdevilopers
Copy link
Author

Thanks @gquemener !

If I understand the event publisher I use with the symfony event bus here correctly then it guarantees the same, right?

And it is not coupled to the Doctrine middleware since the configuration is appended to the prooph event stores:

# config/packages/prooph_event_store_bus_bridge.yaml

services:
    _defaults:
        public: false

    Prooph\EventStoreBusBridge\EventPublisher:
        class: Acme\Context\Infrastructure\Prooph\EventPublisher
        arguments:
            #- '@prooph_service_bus.default_event_bus'
            - '@messenger.bus.events'
        tags:
            - { name: 'prooph_event_store.default_store.plugin' }

Default PDO configuration for prooph:

# config/packages/prooph_pdo_event_store.yaml

services:
    _defaults:
        public: false

    Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator: null

    Prooph\EventStore\EventStore: '@app.event_store.default'

    app.event_store.default:
        class: Prooph\EventStore\Pdo\MariaDbEventStore
        arguments:
            - '@prooph_event_store.message_factory'
            - '@app.event_store.pdo_connection.mysql'
            - '@app.event_store.mysql.persistence_strategy'

    app.event_store.pdo_connection.mysql:
        class: \PDO
        arguments:
            - '%env(MYSQL_DSN)%'
            - '%env(MYSQL_USER)%'
            - '%env(MYSQL_PASSWORD)%'

    app.event_store.mysql.persistence_strategy:
        class: Prooph\EventStore\Pdo\PersistenceStrategy\MariaDbSingleStreamStrategy

For further reading:

@gquemener
Copy link

If I understand the event publisher I use with the symfony event bus here correctly then it guarantees the same, right?

That's correct.

However, just as I'm coupling my event publisher to doctrine, you're coupling yours to the Prooph EventStore (because it is a plugin).
That's an acceptable tradeoff, but the event dispatching could as well happen in your repository, after you've called the appropriate methods on your storage mecanism (an EventStore, a Doctrine EntityManager, ...), bref any moment after you have the guarantee that the AR has been synchronized with the db:

public TodoRepository implements Repository
{
   public function save(Todo $todo): void
   {
      $this->storage->persist($todo);
      foreach ($todo->popEvents() as $event) {
           $this->eventBus->dispatch($event);
      }
}

You could even use some db mecanism that triggers events on successfull write (binlog, NOTIFY, ...), anything that is a good tradeoff between maintainability and guarantee that the event is emitted when the initial fact has been somehow persisted.

@webdevilopers
Copy link
Author

Thanks for the feedback. Indeed there is coupling to any "third-party" here. But I think that there are a lot prooph event-store users out there that would love to switch to the messenger and use there current config. And finally replace the deprecated service-bus-bridge.

I will try to move my event publisher to a plugin and see if it works. Then we could propose it to other users.

Maybe @weaverryan thinks this is a good enhancement for the messenger+prooph users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment