Skip to content

Instantly share code, notes, and snippets.

@Taluu
Last active August 29, 2015 14:11
Show Gist options
  • Save Taluu/9f81e5af553333e14eb3 to your computer and use it in GitHub Desktop.
Save Taluu/9f81e5af553333e14eb3 to your computer and use it in GitHub Desktop.
<?php
use Swarrot\Consumer;
use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;
use Swarrot\Broker\MessagePublisher\PeclPackageMessagePublisher;
use Swarrot\Broker\Message;
use Swarrot\Processor\RPC\RpcClientProcessor;
use Swarrot\Processor\ProcessorInterface;
class Processor implements ProcessorInterface
{
public function process(Message $message, array $options)
{
printf("[.] Got %s \n", $message->getBody());
}
}
$n = isset($argv[1]) ? $argv[1] : 5;
$connection = new \AMQPConnection();
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName('rpc');
$queue = new \AMQPQueue($channel);
$queue->setFlags(\AMQP_EXCLUSIVE);
$queue->setName(substr(sha1(uniqid(mt_rand(), true)), 0, 10));
$queue->declareQueue();
$queue->bind('rpc', $queue->getName());
$messageProvider = new PeclPackageMessageProvider($queue);
$messagePublisher = new PeclPackageMessagePublisher($exchange);
$id = uniqid();
printf("[.] Requesting fibonacci(%s)\n", $n);
$message = new Message((string) $n, ['correlation_id' => $id, 'reply_to' => $queue->getName()]);
$messagePublisher->publish($message, 'fibo');
$consumer = new Consumer($messageProvider, new RpcClientProcessor(new Processor));
$consumer->consume(['rpc_client_correlation_id' => $id]);
printf("[.] Done !\n\n");
<?php
use Swarrot\Consumer;
use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;
use Swarrot\Broker\MessagePublisher\PeclPackageMessagePublisher;
use Swarrot\Broker\Message;
use Swarrot\Processor\RPC\RpcServerProcessor;
use Swarrot\Processor\Ack\AckProcessor;
use Swarrot\Processor\ProcessorInterface;
class Processor implements ProcessorInterface
{
public function process(Message $message, array $options)
{
printf(" [.] calculating fib(%s) \n", $message->getBody());
// act as if it should wait for something
sleep((int) $message->getBody());
return $this->fibo((int) $message->getBody());
}
private function fibo($n)
{
$n = abs($n);
if (1 >= $n) {
return $n;
}
return $this->fibo($n - 1) + $this->fibo($n - 2);
}
}
$connection = new \AMQPConnection();
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName('rpc');
$queue = new \AMQPQueue($channel);
$queue->setName('rpc_fibo');
$messageProvider = new PeclPackageMessageProvider($queue);
$messagePublisher = new PeclPackageMessagePublisher($exchange);
$consumer = new Consumer($messageProvider, new RpcServerProcessor(new AckProcessor(new Processor, $messageProvider), $messagePublisher));
$consumer->consume();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment