Skip to content

Instantly share code, notes, and snippets.

@des1roer
Created December 11, 2018 06:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save des1roer/c4997c457000e0febabb487a610c646d to your computer and use it in GitHub Desktop.
Save des1roer/c4997c457000e0febabb487a610c646d to your computer and use it in GitHub Desktop.
<?php
public function actionSend()
{
$connection = new AMQPStreamConnection(
'***',
***,
'**',
'**',
'**'
);
$channel = $connection->channel();
$channel->queue_declare(
'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
false #autodelete - очередь удаляется, когда отписывается последний подписчик
);
$msg = new AMQPMessage(
'32423',
array('delivery_mode' => 2) #создаёт сообщение постоянным, чтобы оно не потерялось при падении или закрытии сервера
);
$channel->basic_publish(
$msg, #сообщение
'', #обмен
'invoice_queue' #ключ маршрутизации (очередь)
);
$channel->close();
$connection->close();
}
public function actionListen()
{
$connection = new AMQPStreamConnection(
'***',
***,
'**',
'**',
'**'
);
$channel = $connection->channel();
$channel->queue_declare(
'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
false #autodelete - очередь удаляется, когда отписывается последний подписчик
);
/**
* не отправляем новое сообщение на обработчик, пока он
* не обработал и не подтвердил предыдущее. Вместо этого
* направляем сообщение на любой свободный обработчик
*/
$channel->basic_qos(
null, #размер предварительной выборки - размер окна предварительнйо выборки в октетах, null означает “без определённого ограничения”
1, #количество предварительных выборок - окна предварительных выборок в рамках целого сообщения
null #глобальный - global=null означает, что настройки QoS должны применяться для получателей, global=true означает, что настройки QoS должны применяться к каналу
);
/**
* оповещает о своей заинтересованности в получении
* сообщений из определённой очереди. В таком случае мы
* говорим, что они регистрируют получателя, или устанавливают
* подписку на очередь. Каждый получатель (подписка) имеет
* идентификатор, называемый “тег получателя”.
*/
$channel->basic_consume(
'invoice_queue', #очередь
'', #тег получателя - Идентификатор получателя, валидный в пределах текущего канала. Просто строка
false, #не локальный - TRUE: сервер не будет отправлять сообщения соединениям, которые сам опубликовал
false, #без подтверждения - false: подтверждения включены, true - подтверждения отключены. отправлять соответствующее подтверждение обработчику, как только задача будет выполнена
false, #эксклюзивная - к очереди можно получить доступ только в рамках текущего соединения
false, #не ждать - TRUE: сервер не будет отвечать методу. Клиент не должен ждать ответа
array($this, 'process') #функция обратного вызова - метод, который будет принимать сообщение
);
while(count($channel->callbacks)) {
$this->stdout('Слежу за входящими сообщениями' . PHP_EOL);
$channel->wait();
}
$channel->close();
$connection->close();
}
public function process(AMQPMessage $msg)
{
$this->generatePdf()->sendEmail();
$this->stdout($msg->delivery_info['delivery_tag'] . PHP_EOL);
/**
* Если получатель умирает, не отправив подтверждения, брокер
* AMQP пошлёт сообщение другому получателю. Если свободных
* на данный момент нет - брокер подождёт до тех пор, пока
* освободится хотя-бы один зарегистрированный получатель
* на эту очередь, прежде чем попытаться заново доставить
* сообщение
*/
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
/**
* Генерирует PDF файл с накладной
*
* @return \console\controllers\YadController
* @throws \Exception
*/
private function generatePdf()
{
$this->stdout('generatePdf' . PHP_EOL);
/**
* Симулируем время обработки PDF. Это занимает от 2 до 5 секунд
*/
sleep(random_int(2, 5));
return $this;
}
/**
* Отправляет письмо
*
* @return \console\controllers\YadController
* @throws \Exception
*/
private function sendEmail()
{
$this->stdout('sendEmail' . PHP_EOL);
/**
* Симулируем время отправки письма. Занимает 1-3 секунды
*/
sleep(random_int(1,3));
return $this;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment