Skip to content

Instantly share code, notes, and snippets.

@vladdnepr
Created April 15, 2023 11:38
Show Gist options
  • Save vladdnepr/728e1b7b428923443f25d06984034f7a to your computer and use it in GitHub Desktop.
Save vladdnepr/728e1b7b428923443f25d06984034f7a to your computer and use it in GitHub Desktop.
Solution For Index Aliases in FOS Elastica Bundle With Symfony Messenger. Required Symfony 6.2+
<?php
namespace App\Subscriber;
use FOS\ElasticaBundle\Elastica\Index;
use FOS\ElasticaBundle\Event\PostIndexPopulateEvent;
use FOS\ElasticaBundle\Event\PreIndexPopulateEvent;
use FOS\ElasticaBundle\Index\IndexManager;
use FOS\ElasticaBundle\Index\Resetter;
use FOS\ElasticaBundle\Message\AsyncPersistPage;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
final class ElasticaAsyncSubscriber implements EventSubscriberInterface
{
public const OPTION_REAL_INDEX_NAME = '_realIndexName';
private IndexManager $indexManager;
private Resetter $resetter;
private SendersLocator $messengerSendersLocator;
private bool $isSentToMessenger = false;
public function __construct(
IndexManager $indexManager,
Resetter $resetter,
#[Autowire(service: 'messenger.senders_locator')] SendersLocator $messengerSendersLocator,
) {
$this->indexManager = $indexManager;
$this->resetter = $resetter;
$this->messengerSendersLocator = $messengerSendersLocator;
}
public static function getSubscribedEvents(): array
{
return [
PreIndexPopulateEvent::class => ['onPreIndexPopulate', 10],
PostIndexPopulateEvent::class => ['onPostIndexPopulate', 10],
SendMessageToTransportsEvent::class => ['onSendMessageToTransports', 10],
WorkerMessageReceivedEvent::class => ['onWorkerMessageReceived', 10],
WorkerMessageHandledEvent::class => ['onWorkerMessageHandled', 10],
];
}
public function onPreIndexPopulate(PreIndexPopulateEvent $event): void
{
$this->isSentToMessenger = false;
}
public function onPostIndexPopulate(PostIndexPopulateEvent $event): void
{
if ($this->isSentToMessenger) {
// Stop index switch, because we do it in last messenger message
$event->stopPropagation();
}
$this->isSentToMessenger = false;
}
public function onSendMessageToTransports(SendMessageToTransportsEvent $event): void
{
if ($options = $this->getElasticaOptionsFromEnvelope($event->getEnvelope())) {
// Mark for disable index switching
$this->isSentToMessenger = true;
$index = $this->indexManager->getIndex($options['indexName']);
if ($index->getOriginalName() !== $index->getName()) {
// If using alias
/** @var AsyncPersistPage $message */
$message = $event->getEnvelope()->getMessage();
// Update envelope and set new index name
$event->setEnvelope(new Envelope(
new AsyncPersistPage(
$message->getPage(),
$message->getOptions() + [
self::OPTION_REAL_INDEX_NAME => $index->getName(),
]
),
array_merge(...array_values($event->getEnvelope()->all()))
));
}
}
}
public function onWorkerMessageReceived(WorkerMessageReceivedEvent $event): void
{
if ($options = $this->getElasticaOptionsFromEnvelope($event->getEnvelope())) {
$index = $this->indexManager->getIndex($options['indexName']);
// Restore original name
$index->overrideName($index->getOriginalName());
if (isset($options[self::OPTION_REAL_INDEX_NAME])) {
// If new index name available, then override name
// We need to write to new index in worker
$index->overrideName($options[self::OPTION_REAL_INDEX_NAME]);
}
}
}
public function onWorkerMessageHandled(WorkerMessageHandledEvent $event): void
{
if ($options = $this->getElasticaOptionsFromEnvelope($event->getEnvelope())) {
// Get transport for envelope
/** @var \Generator $queueName */
$queueName = $this->messengerSendersLocator
->getSenders($event->getEnvelope());
$transport = $queueName->current();
// Check transport can return message count
if (!$transport instanceof MessageCountAwareInterface) {
throw new \RuntimeException('Transport must implement MessageCountAwareInterface');
}
if (0 === $transport->getMessageCount()) {
// If it is last message in queue
// Switch old and new index
$this->resetter->switchIndexAlias(
$options['indexName'],
$options['delete']
);
// Refresh index
$index = $this->indexManager->getIndex($options['indexName']);
$index->refresh();
}
}
}
/**
* @return array{
* max_per_page: int,
* first_page: int,
* last_page: int,
* delete: bool,
* reset: bool,
* ignore_errors: bool,
* sleep: int,
* indexName: string,
* _realIndexName: string|null,
* }|null
*/
private function getElasticaOptionsFromEnvelope(Envelope $envelope): ?array
{
$options = null;
$message = $envelope->getMessage();
if ($message instanceof AsyncPersistPage) {
/** @var array{
* max_per_page: int,
* first_page: int,
* last_page: int,
* delete: bool,
* reset: bool,
* ignore_errors: bool,
* sleep: int,
* indexName: string,
* _realIndexName: string|null,
* } $options */
$options = $message->getOptions();
}
return $options;
}
}
@miroslav-chandler
Copy link

класне рішення)

@vladdnepr
Copy link
Author

@miroslav-chandler Дякую

@miroslav-chandler
Copy link

@vladdnepr глянь лінкед пліз)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment