Skip to content

Instantly share code, notes, and snippets.

@kriswallsmith
Last active February 13, 2024 15:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kriswallsmith/6514ca5a30db352ad0aea328607546b4 to your computer and use it in GitHub Desktop.
Save kriswallsmith/6514ca5a30db352ad0aea328607546b4 to your computer and use it in GitHub Desktop.
Symfony Messenger message chaining
<?php
declare(strict_types=1);
namespace App\Messenger\Chain;
use App\Messenger\Chain\Stamp\ChainStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;
final readonly class Chain
{
/**
* @param list<object|Envelope|list<object|Envelope>> $chain A list of messages or envelopes
* @param StampInterface[] $stamps Stamps to add to each envelope
*/
public static function make(array $chain, array $stamps = []): Envelope
{
if (!$chain) {
throw new \InvalidArgumentException('Message chain must have at least one item.');
}
if (!is_array($leaves = array_pop($chain))) {
$chain[] = $leaves;
$leaves = [];
}
return Envelope::wrap(array_shift($chain), $stamps)->with(...ChainStamp::fromChain(
chain: $chain,
leaves: $leaves,
stamps: $stamps,
));
}
private function __construct() {}
}
<?php
declare(strict_types=1);
namespace App\Messenger\Chain\Event;
use Symfony\Component\Messenger\Envelope;
final readonly class ChainedMessageFailedEvent
{
public function __construct(
public Envelope $envelope,
public Envelope $failedEnvelope,
public \Throwable $throwable,
) {}
}
<?php
declare(strict_types=1);
namespace App\Messenger\Chain;
use App\Messenger\Chain\Event\ChainedMessageFailedEvent;
use App\Messenger\Chain\Stamp\ChainParentStamp;
use App\Messenger\Chain\Stamp\ChainStamp;
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\SerializedMessageStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
final readonly class ChainMiddleware implements MiddlewareInterface
{
public function __construct(
private MessageBusInterface $bus,
private ?EventDispatcherInterface $dispatcher = null,
) {}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
try {
$envelope = $stack->next()->handle($envelope, $stack);
} catch (\Throwable $e) {
if ($parentStamp = $envelope->last(ChainParentStamp::class)) {
$this->bubbleError($parentStamp, $envelope, $e);
}
throw $e;
}
if (!$handledStamp = $envelope->last(HandledStamp::class)) {
// for example if the send message middleware intercepted this message
return $envelope;
}
/** @var ChainStamp $chainStamp */
foreach ($envelope->all(ChainStamp::class) as $chainStamp) {
if (!$nextEnvelope = $this->nextEnvelope($envelope->getMessage(), $handledStamp, $chainStamp)) {
continue;
}
// dispatch the chained message
$this->bus->dispatch($nextEnvelope->with(new ChainParentStamp($envelope)));
}
// remove chain stamps
return $envelope->withoutAll(ChainStamp::class);
}
/**
* Dispatches a failure event for each ancestor in the chain.
*/
private function bubbleError(ChainParentStamp $parentStamp, Envelope $envelope, \Throwable $e): void
{
do {
$this->dispatcher?->dispatch(new ChainedMessageFailedEvent(
envelope: $parentStamp->parentEnvelope,
failedEnvelope: $envelope,
throwable: $e,
));
} while ($parentStamp = $parentStamp->parentEnvelope->last(ChainParentStamp::class));
}
/**
* Returns the envelope to dispatch after the handled message.
*/
private function nextEnvelope(object $handledMessage, HandledStamp $handledStamp, ChainStamp $chainStamp): ?Envelope
{
$envelope = $chainStamp->envelope;
$message = $envelope->getMessage();
$stamps = self::stamps($envelope);
if (!$message instanceof MessageChainInterface) {
return $envelope;
}
if (!$message = $message->next($handledMessage, $handledStamp->getResult())) {
// skip this stamp
return null;
}
if ($message instanceof Envelope) {
// *append* stamps from chained envelope and open the message
$stamps = [
...$stamps,
...self::stamps($message),
];
$message = $message->getMessage();
}
// force the serializer to re-serialize this message
return (new Envelope($message, $stamps))->withoutAll(SerializedMessageStamp::class);
}
/** @return StampInterface[] */
private static function stamps(Envelope $envelope): array
{
return $envelope->all() ? array_merge(...array_values($envelope->all())) : [];
}
}
<?php
declare(strict_types=1);
namespace App\Tests\Messenger\Chain;
use App\Messenger\Chain\ChainMiddleware;
use App\Messenger\Chain\Event\ChainedMessageFailedEvent;
use App\Messenger\Chain\MessageChainInterface;
use App\Messenger\Chain\Stamp\ChainParentStamp;
use App\Messenger\Chain\Stamp\ChainStamp;
use PHPUnit\Framework\TestCase;
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\HandledStamp;
final class ChainMiddlewareTest extends TestCase
{
public function testChainedMessage(): void
{
$message = (object) [];
$nextMessage = (object) [];
$envelope = new Envelope($message, [
new HandledStamp(null, 'handler'),
ChainStamp::wrap($nextMessage),
]);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects(self::once())
->method('dispatch')
->with(self::callback(function(Envelope $nextEnvelope) use ($nextMessage): bool {
self::assertSame($nextMessage, $nextEnvelope->getMessage());
return true;
}))
->willReturn(new Envelope($nextMessage));
$next = $this->createStub(MiddlewareInterface::class);
$next->method('handle')->willReturn($envelope);
$stack = $this->createStub(StackInterface::class);
$stack->method('next')->willReturn($next);
$middleware = new ChainMiddleware($bus);
$middleware->handle($envelope, $stack);
}
public function testChainedMessageChain(): void
{
$message = (object) [];
$nextMessage = new class() implements MessageChainInterface {
public ?object $message = null;
public mixed $result = null;
public function next(object $handledMessage, mixed $handledResult): object
{
$this->message = $handledMessage;
$this->result = $handledResult;
return $this;
}
};
$envelope = new Envelope($message, [
new HandledStamp('result', 'handler'),
ChainStamp::wrap($nextMessage),
]);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects(self::once())
->method('dispatch')
->willReturn(new Envelope($nextMessage));
$next = $this->createStub(MiddlewareInterface::class);
$next->method('handle')->willReturn($envelope);
$stack = $this->createStub(StackInterface::class);
$stack->method('next')->willReturn($next);
$middleware = new ChainMiddleware($bus);
$middleware->handle($envelope, $stack);
self::assertSame($message, $nextMessage->message);
self::assertSame('result', $nextMessage->result);
}
public function testChainedMessageChainEnvelopeReturn(): void
{
$message = (object) [];
$nextMessage = new class() implements MessageChainInterface {
public function next(object $handledMessage, mixed $handledResult): object
{
return new Envelope((object) ['result' => $handledResult], [new BusNameStamp('bus2')]);
}
};
$envelope = new Envelope($message, [
new HandledStamp('result', 'handler'),
ChainStamp::wrap($nextMessage, [new BusNameStamp('bus1')]),
]);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects(self::once())
->method('dispatch')
->with(self::callback(function(Envelope $nextEnvelope): bool {
self::assertEquals((object) ['result' => 'result'], $nextEnvelope->getMessage());
self::assertEquals([
new BusNameStamp('bus1'),
new BusNameStamp('bus2'),
], $nextEnvelope->all(BusNameStamp::class));
return true;
}))
->willReturn(new Envelope($nextMessage));
$next = $this->createStub(MiddlewareInterface::class);
$next->method('handle')->willReturn($envelope);
$stack = $this->createStub(StackInterface::class);
$stack->method('next')->willReturn($next);
$middleware = new ChainMiddleware($bus);
$middleware->handle($envelope, $stack);
}
public function testChainableStamps(): void
{
$message = (object) [];
$nextMessage = (object) [];
$nextStamp = new BusNameStamp('bus');
$envelope = new Envelope($message, [
new HandledStamp('result', 'handler'),
ChainStamp::wrap($nextMessage, [$nextStamp]),
]);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects(self::once())
->method('dispatch')
->with(
self::callback(function(Envelope $nextEnvelope) use ($nextMessage, $nextStamp): bool {
self::assertSame($nextMessage, $nextEnvelope->getMessage());
self::assertSame($nextStamp, $nextEnvelope->last(BusNameStamp::class));
return true;
}),
)
->willReturn(new Envelope($nextMessage));
$next = $this->createStub(MiddlewareInterface::class);
$next->method('handle')->willReturn($envelope);
$stack = $this->createStub(StackInterface::class);
$stack->method('next')->willReturn($next);
$middleware = new ChainMiddleware($bus);
$middleware->handle($envelope, $stack);
}
public function testErrorBubbling(): void
{
$exception = new \LogicException('Test exception');
$this->expectExceptionObject($exception);
$parentEnvelope = new Envelope((object) []);
$envelope = new Envelope((object) [], [new ChainParentStamp($parentEnvelope)]);
$stack = $this->createStub(StackInterface::class);
$nextMiddleware = $this->createStub(MiddlewareInterface::class);
$stack->method('next')->willReturn($nextMiddleware);
$nextMiddleware->method('handle')->willThrowException($exception);
$bus = $this->createStub(MessageBusInterface::class);
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$dispatcher->expects(self::once())
->method('dispatch')
->with(self::isInstanceOf(ChainedMessageFailedEvent::class));
$middleware = new ChainMiddleware($bus, $dispatcher);
$middleware->handle($envelope, $stack);
}
}
<?php
declare(strict_types=1);
namespace App\Messenger\Chain\Stamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* Stamp applied to chained messages to maintain the link to their parent.
*/
final readonly class ChainParentStamp implements StampInterface
{
public function __construct(
/** The prior message in the chain. */
public Envelope $parentEnvelope,
) {}
}
<?php
declare(strict_types=1);
namespace App\Messenger\Chain\Serializer;
use App\Messenger\Chain\Stamp\ChainParentStamp;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;
use Symfony\Component\Serializer\Normalizer\NormalizerInterface;
use Symfony\Contracts\Service\ServiceSubscriberInterface;
final readonly class ChainParentStampNormalizer implements NormalizerInterface, DenormalizerInterface, ServiceSubscriberInterface
{
public static function getSubscribedServices(): array
{
return [
SerializerInterface::class,
];
}
public function __construct(
private ContainerInterface $container,
) {}
/**
* @param array{headers: array<string, mixed>, body: array<string, mixed>} $data
* @param array<string, mixed> $context
*/
public function denormalize(mixed $data, string $type, string $format = null, array $context = []): ChainParentStamp
{
$serializer = $this->container->get(SerializerInterface::class);
return new ChainParentStamp($serializer->decode([
'headers' => $data['headers'],
'body' => json_encode($data['body']),
]));
}
/** @param array<string, mixed> $context */
public function supportsDenormalization(mixed $data, string $type, string $format = null, array $context = []): bool
{
return ChainParentStamp::class === $type;
}
/**
* @param ChainParentStamp $object
* @param array<string, mixed> $context
* @return array{headers: array<string, mixed>, body: array<string, mixed>}
*/
public function normalize(mixed $object, string $format = null, array $context = []): array
{
$serializer = $this->container->get(SerializerInterface::class);
[
'headers' => $headers,
'body' => $encodedBody,
] = $serializer->encode($object->parentEnvelope);
return [
'headers' => $headers,
'body' => json_decode($encodedBody, associative: true),
];
}
/** @param array<string, mixed> $context */
public function supportsNormalization(mixed $data, string $format = null, array $context = []): bool
{
return $data instanceof ChainParentStamp;
}
/** @return array<string, bool> */
public function getSupportedTypes(?string $format): array
{
return [
ChainParentStamp::class => true,
];
}
}
<?php
declare(strict_types=1);
namespace App\Messenger\Chain\Stamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* Chains a message to be dispatched after the parent message is handled.
*
* If the message implements {@link MessageChainInterface} the result of
* parent message will be passed to the message.
*/
final readonly class ChainStamp implements StampInterface
{
/**
* Turns the supplied message chain into a list of stamps.
*
* @param object[] $chain A list of messages or envelopes to dispatch sequentially
* @param object[] $leaves A list of messages or envelopes to dispatch in parallel, after the chain
* @param StampInterface[] $stamps Stamps to add to each envelope
*
* @return ChainStamp[]
*/
public static function fromChain(array $chain, array $leaves, array $stamps = []): array
{
$chainStamps = array_map(fn(object $leaf) => self::wrap($leaf, $stamps), $leaves);
while ($chain) {
$chainStamps = [
self::wrap(array_pop($chain), [
...$stamps,
...$chainStamps,
]),
];
}
return $chainStamps;
}
/** @param StampInterface[] $stamps */
public static function wrap(object $message, array $stamps = []): self
{
return new self(Envelope::wrap($message, $stamps));
}
public function __construct(
public Envelope $envelope,
) {}
}
<?php
declare(strict_types=1);
namespace App\Messenger\Chain\Serializer;
use App\Messenger\Chain\Stamp\ChainStamp;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;
use Symfony\Component\Serializer\Normalizer\NormalizerInterface;
use Symfony\Contracts\Service\ServiceSubscriberInterface;
final readonly class ChainStampNormalizer implements NormalizerInterface, DenormalizerInterface, ServiceSubscriberInterface
{
public static function getSubscribedServices(): array
{
return [
SerializerInterface::class,
];
}
public function __construct(
private ContainerInterface $container,
) {}
/**
* @param array{headers: array<string, mixed>, body: array<string, mixed>} $data
* @param array<string, mixed> $context
*/
public function denormalize(mixed $data, string $type, string $format = null, array $context = []): ChainStamp
{
$serializer = $this->container->get(SerializerInterface::class);
return new ChainStamp($serializer->decode([
'headers' => $data['headers'],
'body' => json_encode($data['body']),
]));
}
/** @param array<string, mixed> $context */
public function supportsDenormalization(mixed $data, string $type, string $format = null, array $context = []): bool
{
return ChainStamp::class === $type;
}
/**
* @param ChainStamp $object
* @param array<string, mixed> $context
* @return array{headers: array<string, mixed>, body: array<string, mixed>}
*/
public function normalize(mixed $object, string $format = null, array $context = []): array
{
$serializer = $this->container->get(SerializerInterface::class);
[
'headers' => $headers,
'body' => $encodedBody,
] = $serializer->encode($object->envelope);
return [
'headers' => $headers,
'body' => json_decode($encodedBody, associative: true),
];
}
/** @param array<string, mixed> $context */
public function supportsNormalization(mixed $data, string $format = null, array $context = []): bool
{
return $data instanceof ChainStamp;
}
/** @return array<string, bool> */
public function getSupportedTypes(?string $format): array
{
return [
ChainStamp::class => true,
];
}
}
<?php
declare(strict_types=1);
namespace App\Tests\Messenger\Chain;
use App\Messenger\Chain\Chain;
use App\Messenger\Chain\Stamp\ChainStamp;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
final class ChainTest extends TestCase
{
public function testMake(): void
{
$message1 = (object) ['id' => '1'];
$message2 = (object) ['id' => '2'];
$message3a = (object) ['id' => '3a'];
$message3b = (object) ['id' => '3b'];
$envelope = Chain::make([
$message1,
new Envelope($message2, [new BusNameStamp('bus2')]),
[$message3a, $message3b],
], [
new BusNameStamp('bus1'),
]);
/** @var ChainStamp[] $chainStamps */
$chainStamps = $envelope->all(ChainStamp::class);
self::assertCount(1, $chainStamps);
self::assertSame($message2, $chainStamps[0]->envelope->getMessage());
// bus1, bus2
self::assertCount(2, $chainStamps[0]->envelope->all(BusNameStamp::class));
// message3a, message3b
self::assertCount(2, $chainStamps[0]->envelope->all(ChainStamp::class));
}
public function testMakeOneItem(): void
{
$message = (object) [];
$envelope = Chain::make([$message]);
self::assertSame($message, $envelope->getMessage());
self::assertEmpty($envelope->all(ChainStamp::class));
}
}
<?php
declare(strict_types=1);
namespace App\Messenger\Chain;
use Symfony\Component\Messenger\Envelope;
interface MessageChainInterface
{
/**
* Receives the parent message and result.
*
* @param object $handledMessage The parent message
* @param mixed $handledResult The result of the parent message
*
* @return object|Envelope|null The chained message or envelope to run next
*/
public function next(object $handledMessage, mixed $handledResult): ?object;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment