Skip to content

Instantly share code, notes, and snippets.

@iruslanalexan
Forked from mamontov-cpp/rabbitmqreceiver.php
Last active August 12, 2021 14:23
Show Gist options
  • Save iruslanalexan/0d2e38a6225bbbc015cc9b71d5eac353 to your computer and use it in GitHub Desktop.
Save iruslanalexan/0d2e38a6225bbbc015cc9b71d5eac353 to your computer and use it in GitHub Desktop.
rabbitmqreceiver.php
<?
error_reporting(E_ALL & ~E_NOTICE & ~E_WARNING);
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.1.139', 5672, 'localconnect', 'localconnect');
$channel = $connection->channel();
$channel->queue_declare('book', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('book', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
print_r("\n");
<?php
class BookTable extends Bitrix\Main\ORM\Data\DataManager{
public static function getTableName()
{
return "a_book";
}
public static function getMap()
{
return [
'ID' => [
'data_type' => 'integer',
'primary' => true,
'autocomplete' => true,
'title' => 'ID',
],
'FIELD' => array(
'data_type' => 'string',
'title' => 'Name',
),
'AFIELD' => array(
'data_type' => 'string',
'title' => 'Name',
),
'BFIELD' => array(
'data_type' => 'string',
'title' => 'Name',
)
];
}
}
// создание таблицы
$ret=new BookTable;
try {
$ret=$ret::getEntity()->createDbTable();
} catch (Exception $e) {
}
// чтение таблицы
$parameters = [
'select' => [
'ID',
'FIELD',
]
];
$listOfBook = BookTable::getList($parameters)->fetchAll();
//добавление таблицы
$addResult = BookTable::add([
'FIELD' => "name of field",
]);
if ($addResult ->isSuccess())
$bookId = $addResult ->getId();
//обновление таблицы
$obj= new BookTable();
<?
namespace Intervolga\Mybox\Tool\Front\Sku;
use Intervolga\Mybox\Main;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
/**
* @class RabbitMQReceiver
* @package Intervolga\Mybox\Tool\Front\Sku
*
* Высокоуровненвый интерфейс для работы очереди RabbitMQ
*
* Сама очередь хранится в таблице, которую можно восстановить как
* CREATE TABLE `rabbitmq_ordersqueue` (
* `ID` int(11) NOT NULL AUTO_INCREMENT,
* `ORDER_ID` int(11) NOT NULL,
* PRIMARY KEY (`ID`)
* ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
* CREATE INDEX `I_ORDER_ID` ON `rabbitmq_ordersqueue` (`ORDER_ID`);
*
* Абстракция для получения данных из RabbitMQ для импорта
*/
class RabbitMQReceiver
{
/**
* Хост для соединения
*/
const HOST = '172.16.0.30';
/**
* Порт для соединения
*/
const PORT = 5672;
/**
* Логин для соединения
*/
const LOGIN = 'site' /* 'bitrix_service' */;
/**
* Пароль для соединения
*/
const PASSWORD = 'RP%sAi=Vk4iz4=ejy4Z*' /* 'HJnPrG!_!Fsw*mKdxWf!'*/;
/**
* Имя обмена
*/
const EXCHANGE_NAME = 'bitrixExchangeDictionary';
/**
* Имя обмена для теста
*/
const TEST_EXCHANGE_NAME = 'bitrixExchangeDictionary';
/**
* Имя очереди обычной
*/
const QUEUE_NAME = 'federated.bitrixExchangeDictionary';
/**
* Имя очереди для теста
*/
const TEST_QUEUE_NAME = 'federated.bitrixExchangeDictionary';
/**
* Имя ключа для роутинга
*/
const ROUTING_KEY = 'bitrixExchangeDictionary';
/**
* Виртуальный хост
*/
const VHOST = 'mybox' /*'bitrix'*/;
/**
* Соединение инициализируется при соединении
*/
public function __construct()
{
$this->connection = null;
$this->channel = null;
$this->funcOverloadValue = ini_get('mbstring.func_overload');
}
/**
* Соединение с сервером
* @return bool успешно ли произведена отправка
*/
public function connect()
{
try {
$this->resetMBStringFuncOverload();
$this->connection = new AMQPStreamConnection(self::HOST, self::PORT, self::LOGIN, self::PASSWORD, self::VHOST);
$this->channel = $this->connection->channel();
return $this->connection->isConnected();
} catch(\Exception $e) {
echo $e->getMessage() . PHP_EOL;
$this->restoreMBStringFuncOverload();
return false;
}
}
/**
* Привязывает обмен к очереди, чтобы получать сообщения
*/
public function bind()
{
echo 'Вызов queue.declare' . PHP_EOL;
$this->channel->queue_declare(self::getQueueName(), /* passive */ false, /* durable */ true, /* exclusive*/ false, /* auto-delete*/ true);
echo 'Вызов queue.bind' . PHP_EOL;
$this->channel->queue_bind(self::getQueueName(), self::getExchangeName(), self::ROUTING_KEY);
}
/**
* Восстанавливает настройку mbstring.func_overload
*/
public function restoreMBStringFuncOverload()
{
ini_set('mbstring.func_overload', $this->funcOverloadValue);
}
/**
* Сбрасывает настройку mbstring.func_overload
*/
public function resetMBStringFuncOverload()
{
ini_set('mbstring.func_overload', 0);
}
/**
* Запускает сам новый сервер для получения данных из RabbitMQ
* @throws \Exception если случилась ошибка
*/
public function run()
{
$me = $this;
$import = new ImportProcess();
// Функция обратного вызова. которая по сути и делает у нас импорт
$callback = function ($message) use($me, $import) {
$body = $message->body;
echo "Получено сообщение" . PHP_EOL;
echo $body . PHP_EOL;
$me->restoreMBStringFuncOverload();
$import->performWithLargeDocument($body);
$me->resetMBStringFuncOverload();
gc_collect_cycles();
};
echo 'Начато получение сообщений' . PHP_EOL;
$this->channel->basic_consume(self::getQueueName(), '', false, true, false, false, $callback);
while (count($this->channel->callbacks)) {
sleep(1);
$this->channel->wait();
}
echo 'Выход';
}
/**
* Возвращает имя обмена
* @return string
*/
public function getExchangeName()
{
if (\COption::GetOptionString("grain.customsettings","IS_TEST_SITE") == "Y") {
return self::TEST_EXCHANGE_NAME;
} else {
return self::EXCHANGE_NAME;
}
}
public function getQueueName()
{
if (\COption::GetOptionString("grain.customsettings","IS_TEST_SITE") == "Y") {
return self::TEST_QUEUE_NAME;
} else {
return self::QUEUE_NAME;
}
}
/**
* Дисконнект от сервера
*/
public function disconnect()
{
$this->channel->close();
$this->connection->close();
}
/**
* Соединение с сервером
* @var AMQPStreamConnection
*/
private $connection;
/**
* Канал для соединения
* @var AMQPChannel
*/
private $channel;
/**
* Значение mbstring.func_overload
* @var int
*/
private $funcOverloadValue;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment