Last active
May 20, 2023 10:10
-
-
Save sashaaro/9642eb99c9c22893496172e49eae518b to your computer and use it in GitHub Desktop.
Combine consume all queue by one command with symfony 4.4 and OldSoundRabbitMqBundle for efficiently using memory in test environment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Command; | |
use Monolog\Handler\StreamHandler; | |
use Monolog\Logger; | |
use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp; | |
use OldSound\RabbitMqBundle\RabbitMq\Consumer; | |
use PhpAmqpLib\Connection\AbstractConnection; | |
use Symfony\Component\Console\Command\Command; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Output\OutputInterface; | |
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; | |
class CombineQueuesConsumCommand extends Command | |
{ | |
/** @var AbstractConnection */ | |
protected $connection; | |
/** @var iterable|Consumer[] */ | |
protected $consumers; | |
/** @var ServicesResetter */ | |
protected $servicesResetter; | |
/** | |
* @param AbstractConnection $connection | |
* @param iterable|Consumer[] $consumers | |
* @param ServicesResetter $servicesResetter | |
*/ | |
public function __construct( | |
AbstractConnection $connection, | |
iterable $consumers, | |
ServicesResetter $servicesResetter | |
) { | |
$this->connection = $connection; | |
$this->consumers = $consumers; | |
$this->servicesResetter = $servicesResetter; | |
parent::__construct(); | |
} | |
protected function configure(): void | |
{ | |
$this->setName('app:consume-combine-queues') | |
->setDescription('Consume all queue'); | |
} | |
private function declareExchangsAndQueues() | |
{ | |
foreach ($this->consumers as $i => $consumer) { | |
$consumer->setConsumerTag($i); | |
$consumer->setupFabric(); | |
$consumer->close(); | |
} | |
} | |
protected function execute(InputInterface $input, OutputInterface $output) | |
{ | |
$this->declareExchangsAndQueues(); | |
$channel = $this->connection->channel(); | |
$channel->basic_qos(0, 10, true); | |
$reflectionClass = new \ReflectionClass(BaseAmqp::class); | |
$reflectionProperty = $reflectionClass->getProperty('queueOptions'); // TODO extract queue name by better way | |
$reflectionProperty->setAccessible(true); | |
$serviceResetter = $this->servicesResetter; | |
foreach ($this->consumers as $consumer) { | |
$queueOptions = $reflectionProperty->getValue($consumer); | |
// vendor/php-amqplib/rabbitmq-bundle/RabbitMq/BaseConsumer.php setupConsumer | |
$channel | |
->basic_consume( | |
$queueOptions['name'], | |
$consumer->getConsumerTag(), | |
false, | |
false, | |
false, | |
false, | |
static function ($msg) use ($consumer, $serviceResetter) { | |
$consumer->processMessage($msg); | |
$serviceResetter->reset(); | |
} | |
); | |
} | |
$logger = new Logger('consumer'); | |
$handler = new StreamHandler('php://output'); | |
$logger->pushHandler($handler); | |
// use output logger to consumers. preferred inject via services.yml | |
// consume | |
// vendor/php-amqplib/rabbitmq-bundle/RabbitMq/BaseConsumer.php consumer | |
while (count($channel->callbacks)) { | |
$channel->wait(null, false, 500); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
App\Command\CombineQueuesConsumeCommand: | |
arguments: | |
- "@old_sound_rabbit_mq.connection.default" | |
- !tagged_iterator old_sound_rabbit_mq.consumer | |
- "@services_resetter" | |
tags: ['console.command'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
eMAGTechLabs/RabbitMqBundle#38