Created
December 11, 2018 06:35
-
-
Save des1roer/c4997c457000e0febabb487a610c646d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
public function actionSend() | |
{ | |
$connection = new AMQPStreamConnection( | |
'***', | |
***, | |
'**', | |
'**', | |
'**' | |
); | |
$channel = $connection->channel(); | |
$channel->queue_declare( | |
'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов | |
false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера | |
true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера | |
false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения | |
false #autodelete - очередь удаляется, когда отписывается последний подписчик | |
); | |
$msg = new AMQPMessage( | |
'32423', | |
array('delivery_mode' => 2) #создаёт сообщение постоянным, чтобы оно не потерялось при падении или закрытии сервера | |
); | |
$channel->basic_publish( | |
$msg, #сообщение | |
'', #обмен | |
'invoice_queue' #ключ маршрутизации (очередь) | |
); | |
$channel->close(); | |
$connection->close(); | |
} | |
public function actionListen() | |
{ | |
$connection = new AMQPStreamConnection( | |
'***', | |
***, | |
'**', | |
'**', | |
'**' | |
); | |
$channel = $connection->channel(); | |
$channel->queue_declare( | |
'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов | |
false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера | |
true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера | |
false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения | |
false #autodelete - очередь удаляется, когда отписывается последний подписчик | |
); | |
/** | |
* не отправляем новое сообщение на обработчик, пока он | |
* не обработал и не подтвердил предыдущее. Вместо этого | |
* направляем сообщение на любой свободный обработчик | |
*/ | |
$channel->basic_qos( | |
null, #размер предварительной выборки - размер окна предварительнйо выборки в октетах, null означает “без определённого ограничения” | |
1, #количество предварительных выборок - окна предварительных выборок в рамках целого сообщения | |
null #глобальный - global=null означает, что настройки QoS должны применяться для получателей, global=true означает, что настройки QoS должны применяться к каналу | |
); | |
/** | |
* оповещает о своей заинтересованности в получении | |
* сообщений из определённой очереди. В таком случае мы | |
* говорим, что они регистрируют получателя, или устанавливают | |
* подписку на очередь. Каждый получатель (подписка) имеет | |
* идентификатор, называемый “тег получателя”. | |
*/ | |
$channel->basic_consume( | |
'invoice_queue', #очередь | |
'', #тег получателя - Идентификатор получателя, валидный в пределах текущего канала. Просто строка | |
false, #не локальный - TRUE: сервер не будет отправлять сообщения соединениям, которые сам опубликовал | |
false, #без подтверждения - false: подтверждения включены, true - подтверждения отключены. отправлять соответствующее подтверждение обработчику, как только задача будет выполнена | |
false, #эксклюзивная - к очереди можно получить доступ только в рамках текущего соединения | |
false, #не ждать - TRUE: сервер не будет отвечать методу. Клиент не должен ждать ответа | |
array($this, 'process') #функция обратного вызова - метод, который будет принимать сообщение | |
); | |
while(count($channel->callbacks)) { | |
$this->stdout('Слежу за входящими сообщениями' . PHP_EOL); | |
$channel->wait(); | |
} | |
$channel->close(); | |
$connection->close(); | |
} | |
public function process(AMQPMessage $msg) | |
{ | |
$this->generatePdf()->sendEmail(); | |
$this->stdout($msg->delivery_info['delivery_tag'] . PHP_EOL); | |
/** | |
* Если получатель умирает, не отправив подтверждения, брокер | |
* AMQP пошлёт сообщение другому получателю. Если свободных | |
* на данный момент нет - брокер подождёт до тех пор, пока | |
* освободится хотя-бы один зарегистрированный получатель | |
* на эту очередь, прежде чем попытаться заново доставить | |
* сообщение | |
*/ | |
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); | |
} | |
/** | |
* Генерирует PDF файл с накладной | |
* | |
* @return \console\controllers\YadController | |
* @throws \Exception | |
*/ | |
private function generatePdf() | |
{ | |
$this->stdout('generatePdf' . PHP_EOL); | |
/** | |
* Симулируем время обработки PDF. Это занимает от 2 до 5 секунд | |
*/ | |
sleep(random_int(2, 5)); | |
return $this; | |
} | |
/** | |
* Отправляет письмо | |
* | |
* @return \console\controllers\YadController | |
* @throws \Exception | |
*/ | |
private function sendEmail() | |
{ | |
$this->stdout('sendEmail' . PHP_EOL); | |
/** | |
* Симулируем время отправки письма. Занимает 1-3 секунды | |
*/ | |
sleep(random_int(1,3)); | |
return $this; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment