Skip to content

Instantly share code, notes, and snippets.

@jwage
Created March 6, 2024 18:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jwage/3dfb8f42c36dd6a9b16f3641ab9d77a0 to your computer and use it in GitHub Desktop.
Save jwage/3dfb8f42c36dd6a9b16f3641ab9d77a0 to your computer and use it in GitHub Desktop.
Symfony MessageBusInterface implementation that handles AMQPConnectionException and retries
<?php
declare(strict_types=1);
namespace App\Messenger;
use AMQPConnectionException;
use App\Event\Messenger\WorkerMessageDispatchedEvent;
use App\Messenger\Stamp\DispatchedAtStamp;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Throwable;
class MessageBus implements MessageBusInterface
{
private const MAX_RETRIES = 1;
public function __construct(
private MessageBusInterface $wrappedBus,
private EventDispatcherInterface $eventDispatcher,
private LoggerInterface $logger,
) {
}
/** @inheritDoc */
public function dispatch(object $message, array $stamps = []): Envelope
{
$stamps[] = new DispatchedAtStamp();
if ($message instanceof RoutableMessage) {
$stamps[] = new TransportNamesStamp($message->getTransportNames());
}
$envelope = Envelope::wrap($message, $stamps);
$event = new WorkerMessageDispatchedEvent($envelope);
$this->eventDispatcher->dispatch($event);
$retries = 0;
retry:
try {
return $this->wrappedBus->dispatch($event->getEnvelope());
} catch (Throwable $e) {
if ($this->isRetryableException($e) && ++$retries <= self::MAX_RETRIES) {
$this->logger->warning('Retrying send message {class}', [
'class' => $message::class,
'exception' => $e,
'message' => $e->getMessage(),
'retries' => $retries,
'maxRetries' => self::MAX_RETRIES,
]);
goto retry;
}
throw $e;
}
}
/**
* Proxies all method calls to the wrapped bus.
*
* @param array<mixed> $arguments
*/
public function __call(string $method, array $arguments): mixed
{
return $this->wrappedBus->{$method}(...$arguments);
}
private function isRetryableException(Throwable $e): bool
{
return $e->getPrevious() instanceof AMQPConnectionException;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment