Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
<?php
namespace App\Messenger;
use App\Message\Command\CreateInventor;
use App\Message\Command\UpdateInventor;
use App\Util\QueryStringEncoder;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class ExternalJsonMessageSerializer implements SerializerInterface
{
private const STAMP_HEADER_PREFIX = 'X-Message-Stamp-';
public function decode(array $encodedEnvelope): Envelope
{
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
throw new MessageDecodingFailedException('Encoded envelope should have at least a "body" and some "headers".');
}
if (empty($encodedEnvelope['headers']['type'])) {
throw new MessageDecodingFailedException('Encoded envelope does not have a "type" header.');
}
$stamps = $this->decodeStamps($encodedEnvelope);
$data = json_decode($encodedEnvelope['body'], true);
switch ($encodedEnvelope['headers']['type']) {
case 'user_created':
return $this->createEnvelope(CreateInventor::class, $data, $stamps);
case 'user_updated':
return $this->createEnvelope(UpdateInventor::class, $data, $stamps);
}
throw new MessageDecodingFailedException(sprintf('Invalid type "%s"', $headers['type']));
}
public function encode(Envelope $envelope): array
{
$message = $envelope->getMessage();
switch (\get_class($message)) {
case CreateInventor::class:
return $this->encodeEnvelope($envelope, 'user_created');
case UpdateInventor::class:
return $this->encodeEnvelope($envelope, 'user_updated');
}
throw new \Exception(sprintf('Serializer do not support encoding message from class: "%s"', \get_class($message)));
}
private function createEnvelope(string $messageClass, array $data, array $stamps): Envelope
{
if (!isset($data['iri'])) {
throw new MessageDecodingFailedException('Missing the IRI key!');
}
$message = new $messageClass($data['iri']);
$envelope = new Envelope($message, $stamps);
$envelope = $envelope->with(new BusNameStamp('command.bus'));
return $envelope;
}
private function encodeEnvelope(Envelope $envelope, string $type): array
{
$headers = array_merge(['type' => $type], $this->encodeStamps($envelope));
$body = ['iri' => $envelope->getMessage()->getIri()];
$encoded = [
'headers' => $headers,
'body' => json_encode($body),
];
return $encoded;
}
private function encodeStamps(Envelope $envelope): array
{
if (!$allStamps = $envelope->all()) {
return [];
}
$headers = [];
// Retrieve the last DelayStamp
/** @var DelayStamp $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
if (null !== $delayStamp) {
$options = [
'delay' => $delayStamp->getDelay(),
];
$headers[self::STAMP_HEADER_PREFIX.'Delay'] = QueryStringEncoder::encode($options);
}
// Retrieve the last RetryStamp
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
if (null !== $redeliveryStamp) {
$options = [
'retryCount' => $redeliveryStamp->getRetryCount(),
'senderClass' => $redeliveryStamp->getSenderClassOrAlias(),
];
$headers[self::STAMP_HEADER_PREFIX.'Redelivery'] = QueryStringEncoder::encode($options);
}
return $headers;
}
private function decodeStamps(array $encodedEnvelope): array
{
$headers = $encodedEnvelope['headers'];
$stamps = [];
// Convert DelayStamp
if (isset($headers[self::STAMP_HEADER_PREFIX.'Delay'])) {
$options = QueryStringEncoder::decode($headers[self::STAMP_HEADER_PREFIX.'Delay']);
$stamps[] = new DelayStamp($options['delay']);
}
// Convert DelayStamp
if (isset($headers[self::STAMP_HEADER_PREFIX.'Redelivery'])) {
$options = QueryStringEncoder::decode($headers[self::STAMP_HEADER_PREFIX.'Redelivery']);
$stamps[] = new RedeliveryStamp($options['retryCount'], $options['senderClass']);
}
return $stamps;
}
}
<?php
namespace App\Util;
class QueryStringEncoder
{
public static function encode(array $data): string
{
return http_build_query($data, '', ';');
}
public static function decode(string $queryString): array
{
$array = [];
$subsets = explode(';', $queryString);
foreach ($subsets as $subset) {
$value = explode('=', $subset);
$array[$value[0]] = $value[1];
}
return $array;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.