Skip to content

Instantly share code, notes, and snippets.

@sashaaro
Last active May 20, 2023 10:10
Show Gist options
  • Save sashaaro/9642eb99c9c22893496172e49eae518b to your computer and use it in GitHub Desktop.
Save sashaaro/9642eb99c9c22893496172e49eae518b to your computer and use it in GitHub Desktop.
Combine consume all queue by one command with symfony 4.4 and OldSoundRabbitMqBundle for efficiently using memory in test environment
<?php
declare(strict_types=1);
namespace App\Command;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use PhpAmqpLib\Connection\AbstractConnection;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
class CombineQueuesConsumCommand extends Command
{
/** @var AbstractConnection */
protected $connection;
/** @var iterable|Consumer[] */
protected $consumers;
/** @var ServicesResetter */
protected $servicesResetter;
/**
* @param AbstractConnection $connection
* @param iterable|Consumer[] $consumers
* @param ServicesResetter $servicesResetter
*/
public function __construct(
AbstractConnection $connection,
iterable $consumers,
ServicesResetter $servicesResetter
) {
$this->connection = $connection;
$this->consumers = $consumers;
$this->servicesResetter = $servicesResetter;
parent::__construct();
}
protected function configure(): void
{
$this->setName('app:consume-combine-queues')
->setDescription('Consume all queue');
}
private function declareExchangsAndQueues()
{
foreach ($this->consumers as $i => $consumer) {
$consumer->setConsumerTag($i);
$consumer->setupFabric();
$consumer->close();
}
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->declareExchangsAndQueues();
$channel = $this->connection->channel();
$channel->basic_qos(0, 10, true);
$reflectionClass = new \ReflectionClass(BaseAmqp::class);
$reflectionProperty = $reflectionClass->getProperty('queueOptions'); // TODO extract queue name by better way
$reflectionProperty->setAccessible(true);
$serviceResetter = $this->servicesResetter;
foreach ($this->consumers as $consumer) {
$queueOptions = $reflectionProperty->getValue($consumer);
// vendor/php-amqplib/rabbitmq-bundle/RabbitMq/BaseConsumer.php setupConsumer
$channel
->basic_consume(
$queueOptions['name'],
$consumer->getConsumerTag(),
false,
false,
false,
false,
static function ($msg) use ($consumer, $serviceResetter) {
$consumer->processMessage($msg);
$serviceResetter->reset();
}
);
}
$logger = new Logger('consumer');
$handler = new StreamHandler('php://output');
$logger->pushHandler($handler);
// use output logger to consumers. preferred inject via services.yml
// consume
// vendor/php-amqplib/rabbitmq-bundle/RabbitMq/BaseConsumer.php consumer
while (count($channel->callbacks)) {
$channel->wait(null, false, 500);
}
}
}
App\Command\CombineQueuesConsumeCommand:
arguments:
- "@old_sound_rabbit_mq.connection.default"
- !tagged_iterator old_sound_rabbit_mq.consumer
- "@services_resetter"
tags: ['console.command']
@sashaaro
Copy link
Author

sashaaro commented Nov 8, 2020

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment