Skip to content

Instantly share code, notes, and snippets.

@mamontov-cpp
Created June 27, 2018 14:44
Show Gist options
  • Save mamontov-cpp/fb55343800db834ce98e5a18fde19ee2 to your computer and use it in GitHub Desktop.
Save mamontov-cpp/fb55343800db834ce98e5a18fde19ee2 to your computer and use it in GitHub Desktop.
rabbitmqreceiver.php
<?
namespace Intervolga\Mybox\Tool\Front\Sku;
use Intervolga\Mybox\Main;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
/**
* @class RabbitMQReceiver
* @package Intervolga\Mybox\Tool\Front\Sku
*
* Высокоуровненвый интерфейс для работы очереди RabbitMQ
*
* Сама очередь хранится в таблице, которую можно восстановить как
* CREATE TABLE `rabbitmq_ordersqueue` (
* `ID` int(11) NOT NULL AUTO_INCREMENT,
* `ORDER_ID` int(11) NOT NULL,
* PRIMARY KEY (`ID`)
* ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
* CREATE INDEX `I_ORDER_ID` ON `rabbitmq_ordersqueue` (`ORDER_ID`);
*
* Абстракция для получения данных из RabbitMQ для импорта
*/
class RabbitMQReceiver
{
/**
* Хост для соединения
*/
const HOST = '172.16.0.30';
/**
* Порт для соединения
*/
const PORT = 5672;
/**
* Логин для соединения
*/
const LOGIN = 'site' /* 'bitrix_service' */;
/**
* Пароль для соединения
*/
const PASSWORD = 'RP%sAi=Vk4iz4=ejy4Z*' /* 'HJnPrG!_!Fsw*mKdxWf!'*/;
/**
* Имя обмена
*/
const EXCHANGE_NAME = 'bitrixExchangeDictionary';
/**
* Имя обмена для теста
*/
const TEST_EXCHANGE_NAME = 'bitrixExchangeDictionary';
/**
* Имя очереди обычной
*/
const QUEUE_NAME = 'federated.bitrixExchangeDictionary';
/**
* Имя очереди для теста
*/
const TEST_QUEUE_NAME = 'federated.bitrixExchangeDictionary';
/**
* Имя ключа для роутинга
*/
const ROUTING_KEY = 'bitrixExchangeDictionary';
/**
* Виртуальный хост
*/
const VHOST = 'mybox' /*'bitrix'*/;
/**
* Соединение инициализируется при соединении
*/
public function __construct()
{
$this->connection = null;
$this->channel = null;
$this->funcOverloadValue = ini_get('mbstring.func_overload');
}
/**
* Соединение с сервером
* @return bool успешно ли произведена отправка
*/
public function connect()
{
try {
$this->resetMBStringFuncOverload();
$this->connection = new AMQPStreamConnection(self::HOST, self::PORT, self::LOGIN, self::PASSWORD, self::VHOST);
$this->channel = $this->connection->channel();
return $this->connection->isConnected();
} catch(\Exception $e) {
echo $e->getMessage() . PHP_EOL;
$this->restoreMBStringFuncOverload();
return false;
}
}
/**
* Привязывает обмен к очереди, чтобы получать сообщения
*/
public function bind()
{
echo 'Вызов queue.declare' . PHP_EOL;
$this->channel->queue_declare(self::getQueueName(), /* passive */ false, /* durable */ true, /* exclusive*/ false, /* auto-delete*/ true);
echo 'Вызов queue.bind' . PHP_EOL;
$this->channel->queue_bind(self::getQueueName(), self::getExchangeName(), self::ROUTING_KEY);
}
/**
* Восстанавливает настройку mbstring.func_overload
*/
public function restoreMBStringFuncOverload()
{
ini_set('mbstring.func_overload', $this->funcOverloadValue);
}
/**
* Сбрасывает настройку mbstring.func_overload
*/
public function resetMBStringFuncOverload()
{
ini_set('mbstring.func_overload', 0);
}
/**
* Запускает сам новый сервер для получения данных из RabbitMQ
* @throws \Exception если случилась ошибка
*/
public function run()
{
$me = $this;
$import = new ImportProcess();
// Функция обратного вызова. которая по сути и делает у нас импорт
$callback = function ($message) use($me, $import) {
$body = $message->body;
echo "Получено сообщение" . PHP_EOL;
echo $body . PHP_EOL;
$me->restoreMBStringFuncOverload();
$import->performWithLargeDocument($body);
$me->resetMBStringFuncOverload();
gc_collect_cycles();
};
echo 'Начато получение сообщений' . PHP_EOL;
$this->channel->basic_consume(self::getQueueName(), '', false, true, false, false, $callback);
while (count($this->channel->callbacks)) {
sleep(1);
$this->channel->wait();
}
echo 'Выход';
}
/**
* Возвращает имя обмена
* @return string
*/
public function getExchangeName()
{
if (\COption::GetOptionString("grain.customsettings","IS_TEST_SITE") == "Y") {
return self::TEST_EXCHANGE_NAME;
} else {
return self::EXCHANGE_NAME;
}
}
public function getQueueName()
{
if (\COption::GetOptionString("grain.customsettings","IS_TEST_SITE") == "Y") {
return self::TEST_QUEUE_NAME;
} else {
return self::QUEUE_NAME;
}
}
/**
* Дисконнект от сервера
*/
public function disconnect()
{
$this->channel->close();
$this->connection->close();
}
/**
* Соединение с сервером
* @var AMQPStreamConnection
*/
private $connection;
/**
* Канал для соединения
* @var AMQPChannel
*/
private $channel;
/**
* Значение mbstring.func_overload
* @var int
*/
private $funcOverloadValue;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment