Created
April 15, 2023 11:38
-
-
Save vladdnepr/728e1b7b428923443f25d06984034f7a to your computer and use it in GitHub Desktop.
Solution For Index Aliases in FOS Elastica Bundle With Symfony Messenger. Required Symfony 6.2+
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Subscriber; | |
use FOS\ElasticaBundle\Elastica\Index; | |
use FOS\ElasticaBundle\Event\PostIndexPopulateEvent; | |
use FOS\ElasticaBundle\Event\PreIndexPopulateEvent; | |
use FOS\ElasticaBundle\Index\IndexManager; | |
use FOS\ElasticaBundle\Index\Resetter; | |
use FOS\ElasticaBundle\Message\AsyncPersistPage; | |
use Symfony\Component\DependencyInjection\Attribute\Autowire; | |
use Symfony\Component\EventDispatcher\EventSubscriberInterface; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; | |
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; | |
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; | |
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; | |
use Symfony\Component\Messenger\Transport\Sender\SendersLocator; | |
final class ElasticaAsyncSubscriber implements EventSubscriberInterface | |
{ | |
public const OPTION_REAL_INDEX_NAME = '_realIndexName'; | |
private IndexManager $indexManager; | |
private Resetter $resetter; | |
private SendersLocator $messengerSendersLocator; | |
private bool $isSentToMessenger = false; | |
public function __construct( | |
IndexManager $indexManager, | |
Resetter $resetter, | |
#[Autowire(service: 'messenger.senders_locator')] SendersLocator $messengerSendersLocator, | |
) { | |
$this->indexManager = $indexManager; | |
$this->resetter = $resetter; | |
$this->messengerSendersLocator = $messengerSendersLocator; | |
} | |
public static function getSubscribedEvents(): array | |
{ | |
return [ | |
PreIndexPopulateEvent::class => ['onPreIndexPopulate', 10], | |
PostIndexPopulateEvent::class => ['onPostIndexPopulate', 10], | |
SendMessageToTransportsEvent::class => ['onSendMessageToTransports', 10], | |
WorkerMessageReceivedEvent::class => ['onWorkerMessageReceived', 10], | |
WorkerMessageHandledEvent::class => ['onWorkerMessageHandled', 10], | |
]; | |
} | |
public function onPreIndexPopulate(PreIndexPopulateEvent $event): void | |
{ | |
$this->isSentToMessenger = false; | |
} | |
public function onPostIndexPopulate(PostIndexPopulateEvent $event): void | |
{ | |
if ($this->isSentToMessenger) { | |
// Stop index switch, because we do it in last messenger message | |
$event->stopPropagation(); | |
} | |
$this->isSentToMessenger = false; | |
} | |
public function onSendMessageToTransports(SendMessageToTransportsEvent $event): void | |
{ | |
if ($options = $this->getElasticaOptionsFromEnvelope($event->getEnvelope())) { | |
// Mark for disable index switching | |
$this->isSentToMessenger = true; | |
$index = $this->indexManager->getIndex($options['indexName']); | |
if ($index->getOriginalName() !== $index->getName()) { | |
// If using alias | |
/** @var AsyncPersistPage $message */ | |
$message = $event->getEnvelope()->getMessage(); | |
// Update envelope and set new index name | |
$event->setEnvelope(new Envelope( | |
new AsyncPersistPage( | |
$message->getPage(), | |
$message->getOptions() + [ | |
self::OPTION_REAL_INDEX_NAME => $index->getName(), | |
] | |
), | |
array_merge(...array_values($event->getEnvelope()->all())) | |
)); | |
} | |
} | |
} | |
public function onWorkerMessageReceived(WorkerMessageReceivedEvent $event): void | |
{ | |
if ($options = $this->getElasticaOptionsFromEnvelope($event->getEnvelope())) { | |
$index = $this->indexManager->getIndex($options['indexName']); | |
// Restore original name | |
$index->overrideName($index->getOriginalName()); | |
if (isset($options[self::OPTION_REAL_INDEX_NAME])) { | |
// If new index name available, then override name | |
// We need to write to new index in worker | |
$index->overrideName($options[self::OPTION_REAL_INDEX_NAME]); | |
} | |
} | |
} | |
public function onWorkerMessageHandled(WorkerMessageHandledEvent $event): void | |
{ | |
if ($options = $this->getElasticaOptionsFromEnvelope($event->getEnvelope())) { | |
// Get transport for envelope | |
/** @var \Generator $queueName */ | |
$queueName = $this->messengerSendersLocator | |
->getSenders($event->getEnvelope()); | |
$transport = $queueName->current(); | |
// Check transport can return message count | |
if (!$transport instanceof MessageCountAwareInterface) { | |
throw new \RuntimeException('Transport must implement MessageCountAwareInterface'); | |
} | |
if (0 === $transport->getMessageCount()) { | |
// If it is last message in queue | |
// Switch old and new index | |
$this->resetter->switchIndexAlias( | |
$options['indexName'], | |
$options['delete'] | |
); | |
// Refresh index | |
$index = $this->indexManager->getIndex($options['indexName']); | |
$index->refresh(); | |
} | |
} | |
} | |
/** | |
* @return array{ | |
* max_per_page: int, | |
* first_page: int, | |
* last_page: int, | |
* delete: bool, | |
* reset: bool, | |
* ignore_errors: bool, | |
* sleep: int, | |
* indexName: string, | |
* _realIndexName: string|null, | |
* }|null | |
*/ | |
private function getElasticaOptionsFromEnvelope(Envelope $envelope): ?array | |
{ | |
$options = null; | |
$message = $envelope->getMessage(); | |
if ($message instanceof AsyncPersistPage) { | |
/** @var array{ | |
* max_per_page: int, | |
* first_page: int, | |
* last_page: int, | |
* delete: bool, | |
* reset: bool, | |
* ignore_errors: bool, | |
* sleep: int, | |
* indexName: string, | |
* _realIndexName: string|null, | |
* } $options */ | |
$options = $message->getOptions(); | |
} | |
return $options; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
класне рішення)