Created
June 27, 2018 14:44
-
-
Save mamontov-cpp/fb55343800db834ce98e5a18fde19ee2 to your computer and use it in GitHub Desktop.
rabbitmqreceiver.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<? | |
namespace Intervolga\Mybox\Tool\Front\Sku; | |
use Intervolga\Mybox\Main; | |
use PhpAmqpLib\Channel\AMQPChannel; | |
use PhpAmqpLib\Connection\AMQPStreamConnection; | |
use PhpAmqpLib\Exception\AMQPRuntimeException; | |
use PhpAmqpLib\Message\AMQPMessage; | |
use PhpAmqpLib\Wire\AMQPTable; | |
/** | |
* @class RabbitMQReceiver | |
* @package Intervolga\Mybox\Tool\Front\Sku | |
* | |
* Высокоуровненвый интерфейс для работы очереди RabbitMQ | |
* | |
* Сама очередь хранится в таблице, которую можно восстановить как | |
* CREATE TABLE `rabbitmq_ordersqueue` ( | |
* `ID` int(11) NOT NULL AUTO_INCREMENT, | |
* `ORDER_ID` int(11) NOT NULL, | |
* PRIMARY KEY (`ID`) | |
* ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; | |
* CREATE INDEX `I_ORDER_ID` ON `rabbitmq_ordersqueue` (`ORDER_ID`); | |
* | |
* Абстракция для получения данных из RabbitMQ для импорта | |
*/ | |
class RabbitMQReceiver | |
{ | |
/** | |
* Хост для соединения | |
*/ | |
const HOST = '172.16.0.30'; | |
/** | |
* Порт для соединения | |
*/ | |
const PORT = 5672; | |
/** | |
* Логин для соединения | |
*/ | |
const LOGIN = 'site' /* 'bitrix_service' */; | |
/** | |
* Пароль для соединения | |
*/ | |
const PASSWORD = 'RP%sAi=Vk4iz4=ejy4Z*' /* 'HJnPrG!_!Fsw*mKdxWf!'*/; | |
/** | |
* Имя обмена | |
*/ | |
const EXCHANGE_NAME = 'bitrixExchangeDictionary'; | |
/** | |
* Имя обмена для теста | |
*/ | |
const TEST_EXCHANGE_NAME = 'bitrixExchangeDictionary'; | |
/** | |
* Имя очереди обычной | |
*/ | |
const QUEUE_NAME = 'federated.bitrixExchangeDictionary'; | |
/** | |
* Имя очереди для теста | |
*/ | |
const TEST_QUEUE_NAME = 'federated.bitrixExchangeDictionary'; | |
/** | |
* Имя ключа для роутинга | |
*/ | |
const ROUTING_KEY = 'bitrixExchangeDictionary'; | |
/** | |
* Виртуальный хост | |
*/ | |
const VHOST = 'mybox' /*'bitrix'*/; | |
/** | |
* Соединение инициализируется при соединении | |
*/ | |
public function __construct() | |
{ | |
$this->connection = null; | |
$this->channel = null; | |
$this->funcOverloadValue = ini_get('mbstring.func_overload'); | |
} | |
/** | |
* Соединение с сервером | |
* @return bool успешно ли произведена отправка | |
*/ | |
public function connect() | |
{ | |
try { | |
$this->resetMBStringFuncOverload(); | |
$this->connection = new AMQPStreamConnection(self::HOST, self::PORT, self::LOGIN, self::PASSWORD, self::VHOST); | |
$this->channel = $this->connection->channel(); | |
return $this->connection->isConnected(); | |
} catch(\Exception $e) { | |
echo $e->getMessage() . PHP_EOL; | |
$this->restoreMBStringFuncOverload(); | |
return false; | |
} | |
} | |
/** | |
* Привязывает обмен к очереди, чтобы получать сообщения | |
*/ | |
public function bind() | |
{ | |
echo 'Вызов queue.declare' . PHP_EOL; | |
$this->channel->queue_declare(self::getQueueName(), /* passive */ false, /* durable */ true, /* exclusive*/ false, /* auto-delete*/ true); | |
echo 'Вызов queue.bind' . PHP_EOL; | |
$this->channel->queue_bind(self::getQueueName(), self::getExchangeName(), self::ROUTING_KEY); | |
} | |
/** | |
* Восстанавливает настройку mbstring.func_overload | |
*/ | |
public function restoreMBStringFuncOverload() | |
{ | |
ini_set('mbstring.func_overload', $this->funcOverloadValue); | |
} | |
/** | |
* Сбрасывает настройку mbstring.func_overload | |
*/ | |
public function resetMBStringFuncOverload() | |
{ | |
ini_set('mbstring.func_overload', 0); | |
} | |
/** | |
* Запускает сам новый сервер для получения данных из RabbitMQ | |
* @throws \Exception если случилась ошибка | |
*/ | |
public function run() | |
{ | |
$me = $this; | |
$import = new ImportProcess(); | |
// Функция обратного вызова. которая по сути и делает у нас импорт | |
$callback = function ($message) use($me, $import) { | |
$body = $message->body; | |
echo "Получено сообщение" . PHP_EOL; | |
echo $body . PHP_EOL; | |
$me->restoreMBStringFuncOverload(); | |
$import->performWithLargeDocument($body); | |
$me->resetMBStringFuncOverload(); | |
gc_collect_cycles(); | |
}; | |
echo 'Начато получение сообщений' . PHP_EOL; | |
$this->channel->basic_consume(self::getQueueName(), '', false, true, false, false, $callback); | |
while (count($this->channel->callbacks)) { | |
sleep(1); | |
$this->channel->wait(); | |
} | |
echo 'Выход'; | |
} | |
/** | |
* Возвращает имя обмена | |
* @return string | |
*/ | |
public function getExchangeName() | |
{ | |
if (\COption::GetOptionString("grain.customsettings","IS_TEST_SITE") == "Y") { | |
return self::TEST_EXCHANGE_NAME; | |
} else { | |
return self::EXCHANGE_NAME; | |
} | |
} | |
public function getQueueName() | |
{ | |
if (\COption::GetOptionString("grain.customsettings","IS_TEST_SITE") == "Y") { | |
return self::TEST_QUEUE_NAME; | |
} else { | |
return self::QUEUE_NAME; | |
} | |
} | |
/** | |
* Дисконнект от сервера | |
*/ | |
public function disconnect() | |
{ | |
$this->channel->close(); | |
$this->connection->close(); | |
} | |
/** | |
* Соединение с сервером | |
* @var AMQPStreamConnection | |
*/ | |
private $connection; | |
/** | |
* Канал для соединения | |
* @var AMQPChannel | |
*/ | |
private $channel; | |
/** | |
* Значение mbstring.func_overload | |
* @var int | |
*/ | |
private $funcOverloadValue; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment