Skip to content

Instantly share code, notes, and snippets.

@webdevilopers
Created June 4, 2020 09:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save webdevilopers/5903d2c217aea5fbe31a3b86e13ee9da to your computer and use it in GitHub Desktop.
Save webdevilopers/5903d2c217aea5fbe31a3b86e13ee9da to your computer and use it in GitHub Desktop.
Replacing Prooph Service Event Bus with Symfony Messenger using custom Event Publisher
<?php
namespace Acme\Context\Infrastructure\Prooph;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
use Symfony\Component\Messenger\MessageBusInterface;
final class EventPublisher extends AbstractPlugin
{
/**
* @var MessageBusInterface
*/
private $eventBus;
/**
* @var \Iterator[]
*/
private $cachedEventStreams = [];
public function __construct(MessageBusInterface $eventBus)
{
$this->eventBus = $eventBus;
}
public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
{
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_APPEND_TO,
function (ActionEvent $event) use ($eventStore): void {
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamNotFound', false)
|| $event->getParam('concurrencyException', false)
) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_CREATE,
function (ActionEvent $event) use ($eventStore): void {
$stream = $event->getParam('stream');
$recordedEvents = $stream->streamEvents();
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamExistsAlready', false)) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
if ($eventStore instanceof TransactionalActionEventEmitterEventStore) {
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
function (ActionEvent $event): void {
foreach ($this->cachedEventStreams as $stream) {
foreach ($stream as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
}
$this->cachedEventStreams = [];
}
);
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK,
function (ActionEvent $event): void {
$this->cachedEventStreams = [];
}
);
}
}
private function inTransaction(EventStore $eventStore): bool
{
return $eventStore instanceof TransactionalActionEventEmitterEventStore
&& $eventStore->inTransaction();
}
}
services:
_defaults:
public: false
Prooph\EventStoreBusBridge\EventPublisher:
class: Acme\Context\Infrastructure\Prooph\EventPublisher
arguments:
#- '@prooph_service_bus.default_event_bus'
- '@messenger.bus.events'
tags:
- { name: 'prooph_event_store.default_store.plugin' }
@webdevilopers
Copy link
Author

Thanks for your feedback on twitter @gquemener.

I really like the idea of moving this "event publisher" to the Symfony Messenger middleware. I guess it will look similar to this:

Could you publish your code?

Can you tell if the functionality itself is any different from tea approach in this gist?

@gquemener
Copy link

The middleware looks somehow like this:

<?php

declare(strict_types=1);

namespace Infrastructure\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

final class DispatchDomainEventsMiddleware implements MiddlewareInterface
{
    private AggregateRootsCollector $collector;

    private EventBus $eventBus;

    public function __construct(AggregateRootsCollector $collector, EventBus $eventBus)
    {
        $this->collector = $collector;
        $this->eventBus = $eventBus;
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $envelope = $stack->next()->handle($envelope, $stack);

        $aggregateRoots = $this->collector->getAggregateRoots();
        $this->collector->reset();
        foreach ($aggregateRoots as $aggregateRoot) {
            foreach ($aggregateRoot->popEvents() as $event) {
                $this->eventBus->dispatch($event);
            }
        }

        return $envelope;
    }
}

The AR collector is a doctrine listener (postPersist, postLoad, postUpdate and postRemove) that keeps a reference to any AR going through the entity manager.

This solution is not ideal because it is tight to the command bus (events are dispatched after the command has been handled), whereas I realize it should be tight to the AR repository (or the underlying storage, like the EventPublisher is doing), but for my current project this is an acceptable trade-off.

The main concern is to have the guarantee that the events are dispatched when, and only when, db transaction has been committed.

@webdevilopers
Copy link
Author

Thanks @gquemener !

If I understand the event publisher I use with the symfony event bus here correctly then it guarantees the same, right?

And it is not coupled to the Doctrine middleware since the configuration is appended to the prooph event stores:

# config/packages/prooph_event_store_bus_bridge.yaml

services:
    _defaults:
        public: false

    Prooph\EventStoreBusBridge\EventPublisher:
        class: Acme\Context\Infrastructure\Prooph\EventPublisher
        arguments:
            #- '@prooph_service_bus.default_event_bus'
            - '@messenger.bus.events'
        tags:
            - { name: 'prooph_event_store.default_store.plugin' }

Default PDO configuration for prooph:

# config/packages/prooph_pdo_event_store.yaml

services:
    _defaults:
        public: false

    Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator: null

    Prooph\EventStore\EventStore: '@app.event_store.default'

    app.event_store.default:
        class: Prooph\EventStore\Pdo\MariaDbEventStore
        arguments:
            - '@prooph_event_store.message_factory'
            - '@app.event_store.pdo_connection.mysql'
            - '@app.event_store.mysql.persistence_strategy'

    app.event_store.pdo_connection.mysql:
        class: \PDO
        arguments:
            - '%env(MYSQL_DSN)%'
            - '%env(MYSQL_USER)%'
            - '%env(MYSQL_PASSWORD)%'

    app.event_store.mysql.persistence_strategy:
        class: Prooph\EventStore\Pdo\PersistenceStrategy\MariaDbSingleStreamStrategy

For further reading:

@gquemener
Copy link

If I understand the event publisher I use with the symfony event bus here correctly then it guarantees the same, right?

That's correct.

However, just as I'm coupling my event publisher to doctrine, you're coupling yours to the Prooph EventStore (because it is a plugin).
That's an acceptable tradeoff, but the event dispatching could as well happen in your repository, after you've called the appropriate methods on your storage mecanism (an EventStore, a Doctrine EntityManager, ...), bref any moment after you have the guarantee that the AR has been synchronized with the db:

public TodoRepository implements Repository
{
   public function save(Todo $todo): void
   {
      $this->storage->persist($todo);
      foreach ($todo->popEvents() as $event) {
           $this->eventBus->dispatch($event);
      }
}

You could even use some db mecanism that triggers events on successfull write (binlog, NOTIFY, ...), anything that is a good tradeoff between maintainability and guarantee that the event is emitted when the initial fact has been somehow persisted.

@webdevilopers
Copy link
Author

Thanks for the feedback. Indeed there is coupling to any "third-party" here. But I think that there are a lot prooph event-store users out there that would love to switch to the messenger and use there current config. And finally replace the deprecated service-bus-bridge.

I will try to move my event publisher to a plugin and see if it works. Then we could propose it to other users.

Maybe @weaverryan thinks this is a good enhancement for the messenger+prooph users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment