A symfony command which allow to create all configuration in RabbitMQ according to old_sound_rabbit_mq configuration
<?php | |
namespace AppBundle\Command; | |
use Symfony\Bundle\FrameworkBundle\Command\AbstractConfigCommand; | |
use Symfony\Component\Config\Definition\Processor; | |
use Symfony\Component\Console\Exception\LogicException; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Input\InputOption; | |
use Symfony\Component\Console\Output\OutputInterface; | |
use Symfony\Component\Console\Question\Question; | |
use Symfony\Component\DependencyInjection\Extension\Extension; | |
use Symfony\Component\Process\Process; | |
class InitRabbitmqCommand extends AbstractConfigCommand | |
{ | |
/** | |
* @var string | |
*/ | |
private $rabbitmqadmin_commandline; | |
/** | |
* @var string | |
*/ | |
private $rabbitmqadmin_username; | |
/** | |
* @var string | |
*/ | |
private $rabbitmqadmin_password; | |
/** | |
* @var array | |
*/ | |
private $connections = []; | |
/** | |
* @var array | |
*/ | |
private $consumers = []; | |
/** | |
* @var OutputInterface | |
*/ | |
private $output; | |
/** | |
* @var bool | |
*/ | |
private $is_debug = false; | |
/** | |
* {@inheritdoc} | |
*/ | |
public function configure(): void | |
{ | |
$this | |
->setName('init:rabbitmq') | |
->setDescription('Initialize the connections, queues, exchanges for RabbitMQ') | |
->addOption( | |
'rabbitmq-admin-bin', | |
'b', | |
InputOption::VALUE_REQUIRED, | |
'The path of the rabbitmqadmin binary', | |
'/usr/bin/rabbitmqadmin' | |
) | |
->addOption( | |
'debug', | |
'd', | |
InputOption::VALUE_NONE, | |
'Show debug message' | |
) | |
; | |
} | |
/** | |
* {@inheritdoc} | |
*/ | |
public function execute(InputInterface $input, OutputInterface $output): void | |
{ | |
$this->is_debug = $input->getOption('debug'); | |
// Ask the username and password for an RabbitMQ admin user | |
$this->askInformations($input, $output); | |
// Initialize the configuration | |
$this->initConfiguration(); | |
// Foreach consumer, check and create the requirements if necessary | |
$this->rabbitmqadmin_commandline = $input->getOption('rabbitmq-admin-bin'); | |
$this->output = $output; | |
foreach ($this->consumers as $consumer) { | |
$this->processConsumer($consumer); | |
$this->output->writeln('<comment>----------------</comment>'); | |
} | |
} | |
/** | |
* Check and configure all requirements for consumer. | |
* | |
* @param $consumer | |
*/ | |
private function processConsumer($consumer): void | |
{ | |
$this->output->writeln(sprintf('<comment>Process consumer "%s"</comment>', $consumer['name'])); | |
$connection = $consumer['connection']; | |
// Check and process the RabbitMQ user | |
$user = $this->connections[$connection]['user']; | |
$this->output->writeln(sprintf('<comment>Process user "%s"</comment>', $user)); | |
if (false === $this->isObjectExists('users', $user, $connection)) { | |
$this->declareUser($connection); | |
} else { | |
$this->output->writeln(sprintf('<info>The user "%s" already exist</info>', $user)); | |
} | |
// Check and process the vhost | |
$this->processVhost($connection); | |
// Check and process permissions for user | |
$this->processPermissions($connection); | |
// Check and process the RabbitMQ queue | |
$queue = $consumer['queue']['name']; | |
$this->output->writeln(sprintf('<comment>Process queue "%s"</comment>', $queue)); | |
if (false === $this->isObjectExists('queues', $queue, $connection, true)) { | |
$this->declareQueue($connection, $consumer['queue']); | |
} else { | |
$this->output->writeln(sprintf('<info>The queue "%s" already exist</info>', $queue)); | |
} | |
// Check and process the RabbitMQ exchange | |
$exchange = $consumer['exchange']['name']; | |
$this->output->writeln(sprintf('<comment>Process exchange "%s"</comment>', $exchange)); | |
if (false === $this->isObjectExists('exchanges', $exchange, $connection, true)) { | |
$this->declareExchange($connection, $consumer['exchange']); | |
} else { | |
$this->output->writeln(sprintf('<info>The exchange "%s" already exist</info>', $exchange)); | |
} | |
// Check and process the RabbitMQ binding | |
$this->processBinding($connection, $consumer); | |
} | |
/** | |
* Declare the user configured for the connection. | |
* | |
* @param string $connection | |
*/ | |
private function declareUser(string $connection): void | |
{ | |
$user = $this->connections[$connection]['user']; | |
$password = $this->connections[$connection]['password']; | |
$this->executeRabbitMqAdminCommand('declare', 'user', $connection, [ | |
'name' => $user, | |
'password' => $password, | |
'tags' => '', | |
]); | |
$this->output->writeln(sprintf('<info>The user "%s" is created</info>', $user)); | |
} | |
/** | |
* Declare the vhost configured for the connection. | |
* | |
* @param string $connection | |
*/ | |
private function processVhost(string $connection): void | |
{ | |
$vhost = $this->connections[$connection]['vhost']; | |
$this->output->writeln(sprintf('<comment>Process vhost "%s"</comment>', $vhost)); | |
if (true === $this->isObjectExists('vhosts', $vhost, $connection)) { | |
$this->output->writeln(sprintf('<info>The vhost "%s" already exists</info>', $vhost)); | |
return; | |
} | |
$this->executeRabbitMqAdminCommand('declare', 'vhost', $connection, [ | |
'name' => $vhost, | |
]); | |
$this->output->writeln(sprintf('<info>The vhost "%s" is created</info>', $vhost)); | |
} | |
/** | |
* Declare the queue. | |
* | |
* @param string $connection | |
* @param array $queue | |
*/ | |
private function declareQueue(string $connection, array $queue): void | |
{ | |
$this->executeRabbitMqAdminCommand('declare', 'queue', $connection, [ | |
'name' => $queue['name'], | |
'--vhost' => $this->connections[$connection]['vhost'], | |
'auto_delete' => $queue['auto_delete'] ? 'true' : 'false', | |
'durable' => $queue['durable'] ? 'true' : 'false', | |
]); | |
$this->output->writeln(sprintf('<info>The queue "%s" is created</info>', $queue['name'])); | |
} | |
/** | |
* Declare the queue. | |
* | |
* @param string $connection | |
* @param array $exchange | |
*/ | |
private function declareExchange(string $connection, array $exchange): void | |
{ | |
$this->executeRabbitMqAdminCommand('declare', 'exchange', $connection, [ | |
'name' => $exchange['name'], | |
'type' => $exchange['type'], | |
'--vhost' => $this->connections[$connection]['vhost'], | |
'auto_delete' => $exchange['auto_delete'] ? 'true' : 'false', | |
'durable' => $exchange['durable'] ? 'true' : 'false', | |
'internal' => $exchange['internal'] ? 'true' : 'false', | |
]); | |
$this->output->writeln(sprintf('<info>The exchange "%s" is created</info>', $exchange['name'])); | |
} | |
/** | |
* Check and declare permission for user and vhost. | |
* | |
* @param string $connection | |
*/ | |
private function processPermissions(string $connection): void | |
{ | |
$vhost = $this->connections[$connection]['vhost']; | |
foreach ([$this->connections[$connection]['user'], $this->rabbitmqadmin_username] as $user) { | |
$this->output->writeln(sprintf( | |
'<comment>Process permission for user "%s" to "%s"</comment>', | |
$user, | |
$vhost | |
)); | |
$output = $this->executeRabbitMqAdminCommand('list', 'permissions', $connection, [ | |
'user', 'vhost', 'configure', 'read', 'write', // It's the column order | |
]); | |
$regex = '/'. | |
'^\|\s*'.preg_quote($user, '/').'\s*'. | |
'\|\s*'.preg_quote($vhost, '/').'\s*'. | |
'\|\s*'.preg_quote('.*', '/').'\s*'. | |
'\|\s*'.preg_quote('.*', '/').'\s*'. | |
'\|\s*'.preg_quote('.*', '/').'\s*\|'. | |
'/mU' | |
; | |
if (1 === preg_match($regex, $output)) { | |
$this->output->writeln(sprintf( | |
'<info>Permission for user "%s" to "%s" is already setting</info>', | |
$user, | |
$vhost | |
)); | |
continue; | |
} | |
$this->executeRabbitMqAdminCommand('declare', 'permission', $connection, [ | |
'vhost' => $vhost, | |
'user' => $user, | |
'configure' => '.*', | |
'write' => '.*', | |
'read' => '.*', | |
]); | |
$this->output->writeln(sprintf( | |
'<info>Permission for user "%s" to "%s" is setting</info>', | |
$user, | |
$vhost | |
)); | |
} | |
} | |
private function processBinding(string $connection, array $consumer): void | |
{ | |
$source = $consumer['exchange']['name']; | |
$destination = $consumer['queue']['name']; | |
$this->output->writeln(sprintf( | |
'<comment>Process binding for "%s" to "%s"</comment>', | |
$source, | |
$destination | |
)); | |
$output = $this->executeRabbitMqAdminCommand('list', 'bindings', $connection, [ | |
'source', 'destination', // It's the column order | |
'--vhost' => $this->connections[$connection]['vhost'], | |
]); | |
$regex = '/'. | |
'^\|\s*'.preg_quote($source, '/').'\s*'. | |
'\|\s*'.preg_quote($destination, '/').'\s*\|'. | |
'/mU' | |
; | |
if (1 === preg_match($regex, $output)) { | |
$this->output->writeln(sprintf( | |
'<info>Binding for "%s" to "%s" already exists</info>', | |
$source, | |
$destination | |
)); | |
return; | |
} | |
$this->executeRabbitMqAdminCommand('declare', 'binding', $connection, [ | |
'--vhost' => $this->connections[$connection]['vhost'], | |
'source' => $source, | |
'destination' => $destination, | |
]); | |
$this->output->writeln(sprintf( | |
'<info>Binding for "%s" to "%s" is created</info>', | |
$source, | |
$destination | |
)); | |
} | |
/** | |
* Check if an object exists in RabbitMQ. | |
* | |
* @param string $object | |
* @param string $value | |
* @param string $connection | |
* @param bool $use_vhost | |
* | |
* @return bool | |
*/ | |
private function isObjectExists(string $object, string $value, string $connection, bool $use_vhost = false): bool | |
{ | |
$arguments = [ | |
'name', | |
]; | |
if (true === $use_vhost) { | |
$arguments['--vhost'] = $this->connections[$connection]['vhost']; | |
} | |
$output = $this->executeRabbitMqAdminCommand('list', $object, $connection, $arguments); | |
return 1 === preg_match('/^\|\s*'.preg_quote($value, '/').'\s*\|/mU', $output); | |
} | |
/** | |
* Execute a RabbitMQ command. | |
* | |
* @param string $type | |
* @param string $object | |
* @param string $connection_name | |
* @param array $arguments | |
* | |
* @return string | |
*/ | |
private function executeRabbitMqAdminCommand( | |
string $type, | |
string $object, | |
string $connection_name, | |
array $arguments = [] | |
): string { | |
$process_datas = [ | |
$this->rabbitmqadmin_commandline, | |
$type, | |
$object, | |
'-H', $this->connections[$connection_name]['host'], | |
'-P', $this->connections[$connection_name]['port'], | |
'-u', $this->rabbitmqadmin_username, | |
'-p', $this->rabbitmqadmin_password, | |
]; | |
foreach ($arguments as $name => $value) { | |
if (true === is_numeric($name)) { | |
$process_datas[] = $value; | |
} else { | |
$process_datas[] = $name.'='.$value; | |
} | |
} | |
$process = new Process($process_datas); | |
if (true === $this->is_debug) { | |
$this->output->writeln(sprintf('<error>DEBUG : %s</error>', $process->getCommandLine())); | |
} | |
$process->run(); | |
if (false === $process->isSuccessful()) { | |
throw new \RuntimeException($process->getErrorOutput()); | |
} | |
return $process->getOutput(); | |
} | |
/** | |
* Ask to the user: the username and password of the RabbitMQ admin. | |
* | |
* @param InputInterface $input | |
* @param OutputInterface $output | |
*/ | |
public function askInformations(InputInterface $input, OutputInterface $output): void | |
{ | |
$helper = $this->getHelper('question'); | |
// Admin Username | |
$question = new Question('Please enter the admin username for RabbitMQ: '); | |
$question->setValidator(function ($answer) { | |
if (false === \is_string($answer) || true === empty($answer)) { | |
throw new \RuntimeException( | |
'The username of the admin RabbitMQ is required' | |
); | |
} | |
return $answer; | |
}); | |
$this->rabbitmqadmin_username = $helper->ask($input, $output, $question); | |
// Admin password | |
$question = new Question('Please enter the admin password for RabbitMQ: '); | |
$question->setValidator(function ($answer) { | |
if (false === \is_string($answer) || true === empty($answer)) { | |
throw new \RuntimeException( | |
'The password of the admin RabbitMQ is required' | |
); | |
} | |
return $answer; | |
}); | |
$question->setHidden(true); | |
$this->rabbitmqadmin_password = $helper->ask($input, $output, $question); | |
} | |
/** | |
* Read the RabbitMQ configuration and initialize the connections and consumers informations. | |
*/ | |
private function initConfiguration(): void | |
{ | |
/** @var Extension $extension */ | |
$extension = $this->findExtension('old_sound_rabbit_mq'); | |
$container = $this->compileContainer(); | |
$extensionAlias = $extension->getAlias(); | |
$configs = $container->getExtensionConfig($extensionAlias); | |
$configuration = $extension->getConfiguration($configs, $container); | |
if (null === $configuration) { | |
throw new LogicException('Configuration cannot be null'); | |
} | |
$this->validateConfiguration($extension, $configuration); | |
$configs = $container->resolveEnvPlaceholders($container->getParameterBag()->resolveValue($configs)); | |
$processor = new Processor(); | |
$config = $container->resolveEnvPlaceholders($container->getParameterBag()->resolveValue($processor->processConfiguration($configuration, $configs))); | |
$this->initConnections($config); | |
$this->initConsumers($config); | |
} | |
/** | |
* Initialize the connections configuration. | |
* | |
* @param array $config The RabbitMQ configuration | |
*/ | |
private function initConnections(array $config): void | |
{ | |
foreach ($config['connections'] as $name => $datas) { | |
$url_datas = []; | |
if (false === empty($datas['url'])) { | |
$url_datas = parse_url($datas['url']); | |
} | |
$host = $url_datas['host'] ?? $datas['host']; | |
$port = $url_datas['port'] ?? $datas['port']; | |
$user = $url_datas['user'] ?? $datas['user']; | |
$password = $url_datas['pass'] ?? $datas['password']; | |
$vhost = $url_datas['path'] ?? $datas['vhost']; | |
$this->connections[$name] = [ | |
'host' => $host, | |
'port' => $port, | |
'user' => $user, | |
'password' => $password, | |
'vhost' => $vhost, | |
]; | |
} | |
} | |
/** | |
* Initialize the consumers configuration. | |
* | |
* @param array $config The RabbitMQ configuration | |
*/ | |
private function initConsumers(array $config): void | |
{ | |
foreach ($config['consumers'] as $name => $datas) { | |
$this->consumers[$name] = [ | |
'name' => $name, | |
'connection' => $datas['connection'], | |
'exchange' => $datas['exchange_options'], | |
'queue' => $datas['queue_options'], | |
]; | |
} | |
} | |
private function compileContainer() | |
{ | |
$kernel = clone $this->getApplication()->getKernel(); | |
$kernel->boot(); | |
$method = new \ReflectionMethod($kernel, 'buildContainer'); | |
$method->setAccessible(true); | |
$container = $method->invoke($kernel); | |
$container->getCompiler()->compile($container); | |
return $container; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment