Created
May 20, 2018 16:05
-
-
Save leblanc-simon/37bad2b4db9130bb27eabc252ca11790 to your computer and use it in GitHub Desktop.
A symfony command which allow to create all configuration in RabbitMQ according to old_sound_rabbit_mq configuration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace AppBundle\Command; | |
use Symfony\Bundle\FrameworkBundle\Command\AbstractConfigCommand; | |
use Symfony\Component\Config\Definition\Processor; | |
use Symfony\Component\Console\Exception\LogicException; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Input\InputOption; | |
use Symfony\Component\Console\Output\OutputInterface; | |
use Symfony\Component\Console\Question\Question; | |
use Symfony\Component\DependencyInjection\Extension\Extension; | |
use Symfony\Component\Process\Process; | |
class InitRabbitmqCommand extends AbstractConfigCommand | |
{ | |
/** | |
* @var string | |
*/ | |
private $rabbitmqadmin_commandline; | |
/** | |
* @var string | |
*/ | |
private $rabbitmqadmin_username; | |
/** | |
* @var string | |
*/ | |
private $rabbitmqadmin_password; | |
/** | |
* @var array | |
*/ | |
private $connections = []; | |
/** | |
* @var array | |
*/ | |
private $consumers = []; | |
/** | |
* @var OutputInterface | |
*/ | |
private $output; | |
/** | |
* @var bool | |
*/ | |
private $is_debug = false; | |
/** | |
* {@inheritdoc} | |
*/ | |
public function configure(): void | |
{ | |
$this | |
->setName('init:rabbitmq') | |
->setDescription('Initialize the connections, queues, exchanges for RabbitMQ') | |
->addOption( | |
'rabbitmq-admin-bin', | |
'b', | |
InputOption::VALUE_REQUIRED, | |
'The path of the rabbitmqadmin binary', | |
'/usr/bin/rabbitmqadmin' | |
) | |
->addOption( | |
'debug', | |
'd', | |
InputOption::VALUE_NONE, | |
'Show debug message' | |
) | |
; | |
} | |
/** | |
* {@inheritdoc} | |
*/ | |
public function execute(InputInterface $input, OutputInterface $output): void | |
{ | |
$this->is_debug = $input->getOption('debug'); | |
// Ask the username and password for an RabbitMQ admin user | |
$this->askInformations($input, $output); | |
// Initialize the configuration | |
$this->initConfiguration(); | |
// Foreach consumer, check and create the requirements if necessary | |
$this->rabbitmqadmin_commandline = $input->getOption('rabbitmq-admin-bin'); | |
$this->output = $output; | |
foreach ($this->consumers as $consumer) { | |
$this->processConsumer($consumer); | |
$this->output->writeln('<comment>----------------</comment>'); | |
} | |
} | |
/** | |
* Check and configure all requirements for consumer. | |
* | |
* @param $consumer | |
*/ | |
private function processConsumer($consumer): void | |
{ | |
$this->output->writeln(sprintf('<comment>Process consumer "%s"</comment>', $consumer['name'])); | |
$connection = $consumer['connection']; | |
// Check and process the RabbitMQ user | |
$user = $this->connections[$connection]['user']; | |
$this->output->writeln(sprintf('<comment>Process user "%s"</comment>', $user)); | |
if (false === $this->isObjectExists('users', $user, $connection)) { | |
$this->declareUser($connection); | |
} else { | |
$this->output->writeln(sprintf('<info>The user "%s" already exist</info>', $user)); | |
} | |
// Check and process the vhost | |
$this->processVhost($connection); | |
// Check and process permissions for user | |
$this->processPermissions($connection); | |
// Check and process the RabbitMQ queue | |
$queue = $consumer['queue']['name']; | |
$this->output->writeln(sprintf('<comment>Process queue "%s"</comment>', $queue)); | |
if (false === $this->isObjectExists('queues', $queue, $connection, true)) { | |
$this->declareQueue($connection, $consumer['queue']); | |
} else { | |
$this->output->writeln(sprintf('<info>The queue "%s" already exist</info>', $queue)); | |
} | |
// Check and process the RabbitMQ exchange | |
$exchange = $consumer['exchange']['name']; | |
$this->output->writeln(sprintf('<comment>Process exchange "%s"</comment>', $exchange)); | |
if (false === $this->isObjectExists('exchanges', $exchange, $connection, true)) { | |
$this->declareExchange($connection, $consumer['exchange']); | |
} else { | |
$this->output->writeln(sprintf('<info>The exchange "%s" already exist</info>', $exchange)); | |
} | |
// Check and process the RabbitMQ binding | |
$this->processBinding($connection, $consumer); | |
} | |
/** | |
* Declare the user configured for the connection. | |
* | |
* @param string $connection | |
*/ | |
private function declareUser(string $connection): void | |
{ | |
$user = $this->connections[$connection]['user']; | |
$password = $this->connections[$connection]['password']; | |
$this->executeRabbitMqAdminCommand('declare', 'user', $connection, [ | |
'name' => $user, | |
'password' => $password, | |
'tags' => '', | |
]); | |
$this->output->writeln(sprintf('<info>The user "%s" is created</info>', $user)); | |
} | |
/** | |
* Declare the vhost configured for the connection. | |
* | |
* @param string $connection | |
*/ | |
private function processVhost(string $connection): void | |
{ | |
$vhost = $this->connections[$connection]['vhost']; | |
$this->output->writeln(sprintf('<comment>Process vhost "%s"</comment>', $vhost)); | |
if (true === $this->isObjectExists('vhosts', $vhost, $connection)) { | |
$this->output->writeln(sprintf('<info>The vhost "%s" already exists</info>', $vhost)); | |
return; | |
} | |
$this->executeRabbitMqAdminCommand('declare', 'vhost', $connection, [ | |
'name' => $vhost, | |
]); | |
$this->output->writeln(sprintf('<info>The vhost "%s" is created</info>', $vhost)); | |
} | |
/** | |
* Declare the queue. | |
* | |
* @param string $connection | |
* @param array $queue | |
*/ | |
private function declareQueue(string $connection, array $queue): void | |
{ | |
$this->executeRabbitMqAdminCommand('declare', 'queue', $connection, [ | |
'name' => $queue['name'], | |
'--vhost' => $this->connections[$connection]['vhost'], | |
'auto_delete' => $queue['auto_delete'] ? 'true' : 'false', | |
'durable' => $queue['durable'] ? 'true' : 'false', | |
]); | |
$this->output->writeln(sprintf('<info>The queue "%s" is created</info>', $queue['name'])); | |
} | |
/** | |
* Declare the queue. | |
* | |
* @param string $connection | |
* @param array $exchange | |
*/ | |
private function declareExchange(string $connection, array $exchange): void | |
{ | |
$this->executeRabbitMqAdminCommand('declare', 'exchange', $connection, [ | |
'name' => $exchange['name'], | |
'type' => $exchange['type'], | |
'--vhost' => $this->connections[$connection]['vhost'], | |
'auto_delete' => $exchange['auto_delete'] ? 'true' : 'false', | |
'durable' => $exchange['durable'] ? 'true' : 'false', | |
'internal' => $exchange['internal'] ? 'true' : 'false', | |
]); | |
$this->output->writeln(sprintf('<info>The exchange "%s" is created</info>', $exchange['name'])); | |
} | |
/** | |
* Check and declare permission for user and vhost. | |
* | |
* @param string $connection | |
*/ | |
private function processPermissions(string $connection): void | |
{ | |
$vhost = $this->connections[$connection]['vhost']; | |
foreach ([$this->connections[$connection]['user'], $this->rabbitmqadmin_username] as $user) { | |
$this->output->writeln(sprintf( | |
'<comment>Process permission for user "%s" to "%s"</comment>', | |
$user, | |
$vhost | |
)); | |
$output = $this->executeRabbitMqAdminCommand('list', 'permissions', $connection, [ | |
'user', 'vhost', 'configure', 'read', 'write', // It's the column order | |
]); | |
$regex = '/'. | |
'^\|\s*'.preg_quote($user, '/').'\s*'. | |
'\|\s*'.preg_quote($vhost, '/').'\s*'. | |
'\|\s*'.preg_quote('.*', '/').'\s*'. | |
'\|\s*'.preg_quote('.*', '/').'\s*'. | |
'\|\s*'.preg_quote('.*', '/').'\s*\|'. | |
'/mU' | |
; | |
if (1 === preg_match($regex, $output)) { | |
$this->output->writeln(sprintf( | |
'<info>Permission for user "%s" to "%s" is already setting</info>', | |
$user, | |
$vhost | |
)); | |
continue; | |
} | |
$this->executeRabbitMqAdminCommand('declare', 'permission', $connection, [ | |
'vhost' => $vhost, | |
'user' => $user, | |
'configure' => '.*', | |
'write' => '.*', | |
'read' => '.*', | |
]); | |
$this->output->writeln(sprintf( | |
'<info>Permission for user "%s" to "%s" is setting</info>', | |
$user, | |
$vhost | |
)); | |
} | |
} | |
private function processBinding(string $connection, array $consumer): void | |
{ | |
$source = $consumer['exchange']['name']; | |
$destination = $consumer['queue']['name']; | |
$this->output->writeln(sprintf( | |
'<comment>Process binding for "%s" to "%s"</comment>', | |
$source, | |
$destination | |
)); | |
$output = $this->executeRabbitMqAdminCommand('list', 'bindings', $connection, [ | |
'source', 'destination', // It's the column order | |
'--vhost' => $this->connections[$connection]['vhost'], | |
]); | |
$regex = '/'. | |
'^\|\s*'.preg_quote($source, '/').'\s*'. | |
'\|\s*'.preg_quote($destination, '/').'\s*\|'. | |
'/mU' | |
; | |
if (1 === preg_match($regex, $output)) { | |
$this->output->writeln(sprintf( | |
'<info>Binding for "%s" to "%s" already exists</info>', | |
$source, | |
$destination | |
)); | |
return; | |
} | |
$this->executeRabbitMqAdminCommand('declare', 'binding', $connection, [ | |
'--vhost' => $this->connections[$connection]['vhost'], | |
'source' => $source, | |
'destination' => $destination, | |
]); | |
$this->output->writeln(sprintf( | |
'<info>Binding for "%s" to "%s" is created</info>', | |
$source, | |
$destination | |
)); | |
} | |
/** | |
* Check if an object exists in RabbitMQ. | |
* | |
* @param string $object | |
* @param string $value | |
* @param string $connection | |
* @param bool $use_vhost | |
* | |
* @return bool | |
*/ | |
private function isObjectExists(string $object, string $value, string $connection, bool $use_vhost = false): bool | |
{ | |
$arguments = [ | |
'name', | |
]; | |
if (true === $use_vhost) { | |
$arguments['--vhost'] = $this->connections[$connection]['vhost']; | |
} | |
$output = $this->executeRabbitMqAdminCommand('list', $object, $connection, $arguments); | |
return 1 === preg_match('/^\|\s*'.preg_quote($value, '/').'\s*\|/mU', $output); | |
} | |
/** | |
* Execute a RabbitMQ command. | |
* | |
* @param string $type | |
* @param string $object | |
* @param string $connection_name | |
* @param array $arguments | |
* | |
* @return string | |
*/ | |
private function executeRabbitMqAdminCommand( | |
string $type, | |
string $object, | |
string $connection_name, | |
array $arguments = [] | |
): string { | |
$process_datas = [ | |
$this->rabbitmqadmin_commandline, | |
$type, | |
$object, | |
'-H', $this->connections[$connection_name]['host'], | |
'-P', $this->connections[$connection_name]['port'], | |
'-u', $this->rabbitmqadmin_username, | |
'-p', $this->rabbitmqadmin_password, | |
]; | |
foreach ($arguments as $name => $value) { | |
if (true === is_numeric($name)) { | |
$process_datas[] = $value; | |
} else { | |
$process_datas[] = $name.'='.$value; | |
} | |
} | |
$process = new Process($process_datas); | |
if (true === $this->is_debug) { | |
$this->output->writeln(sprintf('<error>DEBUG : %s</error>', $process->getCommandLine())); | |
} | |
$process->run(); | |
if (false === $process->isSuccessful()) { | |
throw new \RuntimeException($process->getErrorOutput()); | |
} | |
return $process->getOutput(); | |
} | |
/** | |
* Ask to the user: the username and password of the RabbitMQ admin. | |
* | |
* @param InputInterface $input | |
* @param OutputInterface $output | |
*/ | |
public function askInformations(InputInterface $input, OutputInterface $output): void | |
{ | |
$helper = $this->getHelper('question'); | |
// Admin Username | |
$question = new Question('Please enter the admin username for RabbitMQ: '); | |
$question->setValidator(function ($answer) { | |
if (false === \is_string($answer) || true === empty($answer)) { | |
throw new \RuntimeException( | |
'The username of the admin RabbitMQ is required' | |
); | |
} | |
return $answer; | |
}); | |
$this->rabbitmqadmin_username = $helper->ask($input, $output, $question); | |
// Admin password | |
$question = new Question('Please enter the admin password for RabbitMQ: '); | |
$question->setValidator(function ($answer) { | |
if (false === \is_string($answer) || true === empty($answer)) { | |
throw new \RuntimeException( | |
'The password of the admin RabbitMQ is required' | |
); | |
} | |
return $answer; | |
}); | |
$question->setHidden(true); | |
$this->rabbitmqadmin_password = $helper->ask($input, $output, $question); | |
} | |
/** | |
* Read the RabbitMQ configuration and initialize the connections and consumers informations. | |
*/ | |
private function initConfiguration(): void | |
{ | |
/** @var Extension $extension */ | |
$extension = $this->findExtension('old_sound_rabbit_mq'); | |
$container = $this->compileContainer(); | |
$extensionAlias = $extension->getAlias(); | |
$configs = $container->getExtensionConfig($extensionAlias); | |
$configuration = $extension->getConfiguration($configs, $container); | |
if (null === $configuration) { | |
throw new LogicException('Configuration cannot be null'); | |
} | |
$this->validateConfiguration($extension, $configuration); | |
$configs = $container->resolveEnvPlaceholders($container->getParameterBag()->resolveValue($configs)); | |
$processor = new Processor(); | |
$config = $container->resolveEnvPlaceholders($container->getParameterBag()->resolveValue($processor->processConfiguration($configuration, $configs))); | |
$this->initConnections($config); | |
$this->initConsumers($config); | |
} | |
/** | |
* Initialize the connections configuration. | |
* | |
* @param array $config The RabbitMQ configuration | |
*/ | |
private function initConnections(array $config): void | |
{ | |
foreach ($config['connections'] as $name => $datas) { | |
$url_datas = []; | |
if (false === empty($datas['url'])) { | |
$url_datas = parse_url($datas['url']); | |
} | |
$host = $url_datas['host'] ?? $datas['host']; | |
$port = $url_datas['port'] ?? $datas['port']; | |
$user = $url_datas['user'] ?? $datas['user']; | |
$password = $url_datas['pass'] ?? $datas['password']; | |
$vhost = $url_datas['path'] ?? $datas['vhost']; | |
$this->connections[$name] = [ | |
'host' => $host, | |
'port' => $port, | |
'user' => $user, | |
'password' => $password, | |
'vhost' => $vhost, | |
]; | |
} | |
} | |
/** | |
* Initialize the consumers configuration. | |
* | |
* @param array $config The RabbitMQ configuration | |
*/ | |
private function initConsumers(array $config): void | |
{ | |
foreach ($config['consumers'] as $name => $datas) { | |
$this->consumers[$name] = [ | |
'name' => $name, | |
'connection' => $datas['connection'], | |
'exchange' => $datas['exchange_options'], | |
'queue' => $datas['queue_options'], | |
]; | |
} | |
} | |
private function compileContainer() | |
{ | |
$kernel = clone $this->getApplication()->getKernel(); | |
$kernel->boot(); | |
$method = new \ReflectionMethod($kernel, 'buildContainer'); | |
$method->setAccessible(true); | |
$container = $method->invoke($kernel); | |
$container->getCompiler()->compile($container); | |
return $container; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment