Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A symfony command which allow to create all configuration in RabbitMQ according to old_sound_rabbit_mq configuration
<?php
namespace AppBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\AbstractConfigCommand;
use Symfony\Component\Config\Definition\Processor;
use Symfony\Component\Console\Exception\LogicException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\Question;
use Symfony\Component\DependencyInjection\Extension\Extension;
use Symfony\Component\Process\Process;
class InitRabbitmqCommand extends AbstractConfigCommand
{
/**
* @var string
*/
private $rabbitmqadmin_commandline;
/**
* @var string
*/
private $rabbitmqadmin_username;
/**
* @var string
*/
private $rabbitmqadmin_password;
/**
* @var array
*/
private $connections = [];
/**
* @var array
*/
private $consumers = [];
/**
* @var OutputInterface
*/
private $output;
/**
* @var bool
*/
private $is_debug = false;
/**
* {@inheritdoc}
*/
public function configure(): void
{
$this
->setName('init:rabbitmq')
->setDescription('Initialize the connections, queues, exchanges for RabbitMQ')
->addOption(
'rabbitmq-admin-bin',
'b',
InputOption::VALUE_REQUIRED,
'The path of the rabbitmqadmin binary',
'/usr/bin/rabbitmqadmin'
)
->addOption(
'debug',
'd',
InputOption::VALUE_NONE,
'Show debug message'
)
;
}
/**
* {@inheritdoc}
*/
public function execute(InputInterface $input, OutputInterface $output): void
{
$this->is_debug = $input->getOption('debug');
// Ask the username and password for an RabbitMQ admin user
$this->askInformations($input, $output);
// Initialize the configuration
$this->initConfiguration();
// Foreach consumer, check and create the requirements if necessary
$this->rabbitmqadmin_commandline = $input->getOption('rabbitmq-admin-bin');
$this->output = $output;
foreach ($this->consumers as $consumer) {
$this->processConsumer($consumer);
$this->output->writeln('<comment>----------------</comment>');
}
}
/**
* Check and configure all requirements for consumer.
*
* @param $consumer
*/
private function processConsumer($consumer): void
{
$this->output->writeln(sprintf('<comment>Process consumer "%s"</comment>', $consumer['name']));
$connection = $consumer['connection'];
// Check and process the RabbitMQ user
$user = $this->connections[$connection]['user'];
$this->output->writeln(sprintf('<comment>Process user "%s"</comment>', $user));
if (false === $this->isObjectExists('users', $user, $connection)) {
$this->declareUser($connection);
} else {
$this->output->writeln(sprintf('<info>The user "%s" already exist</info>', $user));
}
// Check and process the vhost
$this->processVhost($connection);
// Check and process permissions for user
$this->processPermissions($connection);
// Check and process the RabbitMQ queue
$queue = $consumer['queue']['name'];
$this->output->writeln(sprintf('<comment>Process queue "%s"</comment>', $queue));
if (false === $this->isObjectExists('queues', $queue, $connection, true)) {
$this->declareQueue($connection, $consumer['queue']);
} else {
$this->output->writeln(sprintf('<info>The queue "%s" already exist</info>', $queue));
}
// Check and process the RabbitMQ exchange
$exchange = $consumer['exchange']['name'];
$this->output->writeln(sprintf('<comment>Process exchange "%s"</comment>', $exchange));
if (false === $this->isObjectExists('exchanges', $exchange, $connection, true)) {
$this->declareExchange($connection, $consumer['exchange']);
} else {
$this->output->writeln(sprintf('<info>The exchange "%s" already exist</info>', $exchange));
}
// Check and process the RabbitMQ binding
$this->processBinding($connection, $consumer);
}
/**
* Declare the user configured for the connection.
*
* @param string $connection
*/
private function declareUser(string $connection): void
{
$user = $this->connections[$connection]['user'];
$password = $this->connections[$connection]['password'];
$this->executeRabbitMqAdminCommand('declare', 'user', $connection, [
'name' => $user,
'password' => $password,
'tags' => '',
]);
$this->output->writeln(sprintf('<info>The user "%s" is created</info>', $user));
}
/**
* Declare the vhost configured for the connection.
*
* @param string $connection
*/
private function processVhost(string $connection): void
{
$vhost = $this->connections[$connection]['vhost'];
$this->output->writeln(sprintf('<comment>Process vhost "%s"</comment>', $vhost));
if (true === $this->isObjectExists('vhosts', $vhost, $connection)) {
$this->output->writeln(sprintf('<info>The vhost "%s" already exists</info>', $vhost));
return;
}
$this->executeRabbitMqAdminCommand('declare', 'vhost', $connection, [
'name' => $vhost,
]);
$this->output->writeln(sprintf('<info>The vhost "%s" is created</info>', $vhost));
}
/**
* Declare the queue.
*
* @param string $connection
* @param array $queue
*/
private function declareQueue(string $connection, array $queue): void
{
$this->executeRabbitMqAdminCommand('declare', 'queue', $connection, [
'name' => $queue['name'],
'--vhost' => $this->connections[$connection]['vhost'],
'auto_delete' => $queue['auto_delete'] ? 'true' : 'false',
'durable' => $queue['durable'] ? 'true' : 'false',
]);
$this->output->writeln(sprintf('<info>The queue "%s" is created</info>', $queue['name']));
}
/**
* Declare the queue.
*
* @param string $connection
* @param array $exchange
*/
private function declareExchange(string $connection, array $exchange): void
{
$this->executeRabbitMqAdminCommand('declare', 'exchange', $connection, [
'name' => $exchange['name'],
'type' => $exchange['type'],
'--vhost' => $this->connections[$connection]['vhost'],
'auto_delete' => $exchange['auto_delete'] ? 'true' : 'false',
'durable' => $exchange['durable'] ? 'true' : 'false',
'internal' => $exchange['internal'] ? 'true' : 'false',
]);
$this->output->writeln(sprintf('<info>The exchange "%s" is created</info>', $exchange['name']));
}
/**
* Check and declare permission for user and vhost.
*
* @param string $connection
*/
private function processPermissions(string $connection): void
{
$vhost = $this->connections[$connection]['vhost'];
foreach ([$this->connections[$connection]['user'], $this->rabbitmqadmin_username] as $user) {
$this->output->writeln(sprintf(
'<comment>Process permission for user "%s" to "%s"</comment>',
$user,
$vhost
));
$output = $this->executeRabbitMqAdminCommand('list', 'permissions', $connection, [
'user', 'vhost', 'configure', 'read', 'write', // It's the column order
]);
$regex = '/'.
'^\|\s*'.preg_quote($user, '/').'\s*'.
'\|\s*'.preg_quote($vhost, '/').'\s*'.
'\|\s*'.preg_quote('.*', '/').'\s*'.
'\|\s*'.preg_quote('.*', '/').'\s*'.
'\|\s*'.preg_quote('.*', '/').'\s*\|'.
'/mU'
;
if (1 === preg_match($regex, $output)) {
$this->output->writeln(sprintf(
'<info>Permission for user "%s" to "%s" is already setting</info>',
$user,
$vhost
));
continue;
}
$this->executeRabbitMqAdminCommand('declare', 'permission', $connection, [
'vhost' => $vhost,
'user' => $user,
'configure' => '.*',
'write' => '.*',
'read' => '.*',
]);
$this->output->writeln(sprintf(
'<info>Permission for user "%s" to "%s" is setting</info>',
$user,
$vhost
));
}
}
private function processBinding(string $connection, array $consumer): void
{
$source = $consumer['exchange']['name'];
$destination = $consumer['queue']['name'];
$this->output->writeln(sprintf(
'<comment>Process binding for "%s" to "%s"</comment>',
$source,
$destination
));
$output = $this->executeRabbitMqAdminCommand('list', 'bindings', $connection, [
'source', 'destination', // It's the column order
'--vhost' => $this->connections[$connection]['vhost'],
]);
$regex = '/'.
'^\|\s*'.preg_quote($source, '/').'\s*'.
'\|\s*'.preg_quote($destination, '/').'\s*\|'.
'/mU'
;
if (1 === preg_match($regex, $output)) {
$this->output->writeln(sprintf(
'<info>Binding for "%s" to "%s" already exists</info>',
$source,
$destination
));
return;
}
$this->executeRabbitMqAdminCommand('declare', 'binding', $connection, [
'--vhost' => $this->connections[$connection]['vhost'],
'source' => $source,
'destination' => $destination,
]);
$this->output->writeln(sprintf(
'<info>Binding for "%s" to "%s" is created</info>',
$source,
$destination
));
}
/**
* Check if an object exists in RabbitMQ.
*
* @param string $object
* @param string $value
* @param string $connection
* @param bool $use_vhost
*
* @return bool
*/
private function isObjectExists(string $object, string $value, string $connection, bool $use_vhost = false): bool
{
$arguments = [
'name',
];
if (true === $use_vhost) {
$arguments['--vhost'] = $this->connections[$connection]['vhost'];
}
$output = $this->executeRabbitMqAdminCommand('list', $object, $connection, $arguments);
return 1 === preg_match('/^\|\s*'.preg_quote($value, '/').'\s*\|/mU', $output);
}
/**
* Execute a RabbitMQ command.
*
* @param string $type
* @param string $object
* @param string $connection_name
* @param array $arguments
*
* @return string
*/
private function executeRabbitMqAdminCommand(
string $type,
string $object,
string $connection_name,
array $arguments = []
): string {
$process_datas = [
$this->rabbitmqadmin_commandline,
$type,
$object,
'-H', $this->connections[$connection_name]['host'],
'-P', $this->connections[$connection_name]['port'],
'-u', $this->rabbitmqadmin_username,
'-p', $this->rabbitmqadmin_password,
];
foreach ($arguments as $name => $value) {
if (true === is_numeric($name)) {
$process_datas[] = $value;
} else {
$process_datas[] = $name.'='.$value;
}
}
$process = new Process($process_datas);
if (true === $this->is_debug) {
$this->output->writeln(sprintf('<error>DEBUG : %s</error>', $process->getCommandLine()));
}
$process->run();
if (false === $process->isSuccessful()) {
throw new \RuntimeException($process->getErrorOutput());
}
return $process->getOutput();
}
/**
* Ask to the user: the username and password of the RabbitMQ admin.
*
* @param InputInterface $input
* @param OutputInterface $output
*/
public function askInformations(InputInterface $input, OutputInterface $output): void
{
$helper = $this->getHelper('question');
// Admin Username
$question = new Question('Please enter the admin username for RabbitMQ: ');
$question->setValidator(function ($answer) {
if (false === \is_string($answer) || true === empty($answer)) {
throw new \RuntimeException(
'The username of the admin RabbitMQ is required'
);
}
return $answer;
});
$this->rabbitmqadmin_username = $helper->ask($input, $output, $question);
// Admin password
$question = new Question('Please enter the admin password for RabbitMQ: ');
$question->setValidator(function ($answer) {
if (false === \is_string($answer) || true === empty($answer)) {
throw new \RuntimeException(
'The password of the admin RabbitMQ is required'
);
}
return $answer;
});
$question->setHidden(true);
$this->rabbitmqadmin_password = $helper->ask($input, $output, $question);
}
/**
* Read the RabbitMQ configuration and initialize the connections and consumers informations.
*/
private function initConfiguration(): void
{
/** @var Extension $extension */
$extension = $this->findExtension('old_sound_rabbit_mq');
$container = $this->compileContainer();
$extensionAlias = $extension->getAlias();
$configs = $container->getExtensionConfig($extensionAlias);
$configuration = $extension->getConfiguration($configs, $container);
if (null === $configuration) {
throw new LogicException('Configuration cannot be null');
}
$this->validateConfiguration($extension, $configuration);
$configs = $container->resolveEnvPlaceholders($container->getParameterBag()->resolveValue($configs));
$processor = new Processor();
$config = $container->resolveEnvPlaceholders($container->getParameterBag()->resolveValue($processor->processConfiguration($configuration, $configs)));
$this->initConnections($config);
$this->initConsumers($config);
}
/**
* Initialize the connections configuration.
*
* @param array $config The RabbitMQ configuration
*/
private function initConnections(array $config): void
{
foreach ($config['connections'] as $name => $datas) {
$url_datas = [];
if (false === empty($datas['url'])) {
$url_datas = parse_url($datas['url']);
}
$host = $url_datas['host'] ?? $datas['host'];
$port = $url_datas['port'] ?? $datas['port'];
$user = $url_datas['user'] ?? $datas['user'];
$password = $url_datas['pass'] ?? $datas['password'];
$vhost = $url_datas['path'] ?? $datas['vhost'];
$this->connections[$name] = [
'host' => $host,
'port' => $port,
'user' => $user,
'password' => $password,
'vhost' => $vhost,
];
}
}
/**
* Initialize the consumers configuration.
*
* @param array $config The RabbitMQ configuration
*/
private function initConsumers(array $config): void
{
foreach ($config['consumers'] as $name => $datas) {
$this->consumers[$name] = [
'name' => $name,
'connection' => $datas['connection'],
'exchange' => $datas['exchange_options'],
'queue' => $datas['queue_options'],
];
}
}
private function compileContainer()
{
$kernel = clone $this->getApplication()->getKernel();
$kernel->boot();
$method = new \ReflectionMethod($kernel, 'buildContainer');
$method->setAccessible(true);
$container = $method->invoke($kernel);
$container->getCompiler()->compile($container);
return $container;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment