Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save makasim/92a82cddcc074c128470ca0e824f4b24 to your computer and use it in GitHub Desktop.
Save makasim/92a82cddcc074c128470ca0e824f4b24 to your computer and use it in GitHub Desktop.
<?php
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Result;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Rpc\RpcClient;
/** @var AmqpContext $c */
$c = \Enqueue\dsn_to_context('amqp://');
$q1 = $c->createQueue('Q1');
$c->declareQueue($q1);
$q2 = $c->createQueue('Q2');
$c->declareQueue($q2);
$q3 = $c->createQueue('Q3');
$c->declareQueue($q3);
$taskMessage = $c->createMessage('the task data');
$taskMessage->setCorrelationId(uniqid('', true)); // or use uuid4
$timeout = 30000; // 30 seconds
$promises = new \SplObjectStorage();
$rpcClient = new RpcClient($c);
$promises->attach($rpcClient->callAsync($q1, clone $taskMessage, $timeout));
$promises->attach($rpcClient->callAsync($q2, clone $taskMessage, $timeout));
$promises->attach($rpcClient->callAsync($q3, clone $taskMessage, $timeout));
while (count($promises)) {
foreach ($promises as $promise) {
if ($replyMessage = $promise->receiveNoWait()) {
// you may want to check the response here
$promises->detach($promise);
}
}
}
// we got all reply so now we can send a message to Q4.
$q4 = $c->createQueue('Q4');
$c->declareQueue($q4);
$c->createProducer()->send($q4, clone $taskMessage);
// CONSUMER CODE
$processor = function(PsrMessage $message, PsrContext $context) {
// do task job
return Result::reply($context->createMessage('the reply data'));
};
$queueConsumer = new QueueConsumer($c, new ReplyExtension());
$queueConsumer->bind($q1, $processor);
$queueConsumer->bind($q2, $processor);
$queueConsumer->bind($q3, $processor);
$queueConsumer->bind($q4, $processor);
$queueConsumer->consume();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment