Last active
June 22, 2017 11:20
-
-
Save makasim/92a82cddcc074c128470ca0e824f4b24 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
use Enqueue\AmqpExt\AmqpContext; | |
use Enqueue\Consumption\Extension\ReplyExtension; | |
use Enqueue\Consumption\QueueConsumer; | |
use Enqueue\Consumption\Result; | |
use Enqueue\Psr\PsrContext; | |
use Enqueue\Psr\PsrMessage; | |
use Enqueue\Rpc\RpcClient; | |
/** @var AmqpContext $c */ | |
$c = \Enqueue\dsn_to_context('amqp://'); | |
$q1 = $c->createQueue('Q1'); | |
$c->declareQueue($q1); | |
$q2 = $c->createQueue('Q2'); | |
$c->declareQueue($q2); | |
$q3 = $c->createQueue('Q3'); | |
$c->declareQueue($q3); | |
$taskMessage = $c->createMessage('the task data'); | |
$taskMessage->setCorrelationId(uniqid('', true)); // or use uuid4 | |
$timeout = 30000; // 30 seconds | |
$promises = new \SplObjectStorage(); | |
$rpcClient = new RpcClient($c); | |
$promises->attach($rpcClient->callAsync($q1, clone $taskMessage, $timeout)); | |
$promises->attach($rpcClient->callAsync($q2, clone $taskMessage, $timeout)); | |
$promises->attach($rpcClient->callAsync($q3, clone $taskMessage, $timeout)); | |
while (count($promises)) { | |
foreach ($promises as $promise) { | |
if ($replyMessage = $promise->receiveNoWait()) { | |
// you may want to check the response here | |
$promises->detach($promise); | |
} | |
} | |
} | |
// we got all reply so now we can send a message to Q4. | |
$q4 = $c->createQueue('Q4'); | |
$c->declareQueue($q4); | |
$c->createProducer()->send($q4, clone $taskMessage); | |
// CONSUMER CODE | |
$processor = function(PsrMessage $message, PsrContext $context) { | |
// do task job | |
return Result::reply($context->createMessage('the reply data')); | |
}; | |
$queueConsumer = new QueueConsumer($c, new ReplyExtension()); | |
$queueConsumer->bind($q1, $processor); | |
$queueConsumer->bind($q2, $processor); | |
$queueConsumer->bind($q3, $processor); | |
$queueConsumer->bind($q4, $processor); | |
$queueConsumer->consume(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment