Skip to content

Instantly share code, notes, and snippets.

@jakubkulhan
Created June 2, 2015 21:00
Show Gist options
  • Save jakubkulhan/d761346f29cf20d1677f to your computer and use it in GitHub Desktop.
Save jakubkulhan/d761346f29cf20d1677f to your computer and use it in GitHub Desktop.
BunnyPHP RPC example
<?php
namespace Example;
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
require_once __DIR__ . "/vendor/autoload.php";
class RPCClient
{
/** @var Channel */
private $channel;
/** @var string */
private $serverQueue;
/** @var string */
private $callbackQueue;
/** @var Deferred[] */
private $callbacks = [];
public function __construct(Channel $channel, $serverQueue)
{
$this->channel = $channel;
$this->serverQueue = $serverQueue;
}
public function init()
{
$this->channel->queueDeclare($this->serverQueue, false, true);
$callbackQueue = $this->channel->queueDeclare("", false, false, true);
$this->callbackQueue = $callbackQueue->queue;
$this->channel->consume(function (Message $message) {
$this->handleMessage($message);
}, $this->callbackQueue);
}
private function handleMessage(Message $message)
{
$correlationId = $message->getHeader("correlation-id");
if (!isset($correlationId)) {
throw new \RuntimeException("Server returned response without correlation-id.");
}
if (!isset($this->callbacks[$correlationId])) {
throw new \RuntimeException("Server responded with unknown correlation-id: '{$correlationId}'.");
}
$this->callbacks[$correlationId]->resolve($message->content);
$this->channel->ack($message);
}
/**
* @param string $body
* @return PromiseInterface
*/
public function call($body)
{
$correlationId = uniqid("", true);
$this->channel->publish($body, [
"correlation-id" => $correlationId,
"reply-to" => $this->callbackQueue,
], "", $this->serverQueue);
$this->callbacks[$correlationId] = new Deferred();
return $this->callbacks[$correlationId]->promise();
}
}
$client = new Client();
$client->connect();
$channel = $client->channel();
$rpcClient = new RPCClient($channel, "great_server");
$rpcClient->init();
$rpcClient->call($_SERVER["argv"][1])->then(function ($response) use ($client) {
echo "server responded with: '{$response}'\n";
$client->stop();
});
$client->run();
<?php
namespace Example;
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
require_once __DIR__ . "/vendor/autoload.php";
class RPCServer
{
/** @var Channel */
private $channel;
/** @var string */
private $serverQueue;
/** @var callable */
private $handler;
public function __construct(Channel $channel, $serverQueue, callable $handler)
{
$this->channel = $channel;
$this->serverQueue = $serverQueue;
$this->handler = $handler;
}
public function init()
{
$this->channel->queueDeclare($this->serverQueue, false, true);
$this->channel->consume(function (Message $message) {
$this->handleMessage($message);
}, $this->serverQueue);
}
private function handleMessage(Message $message)
{
try {
$handler = $this->handler;
$response = $handler($message->content);
} catch (\Exception $e) {
$response = "";
}
$this->channel->publish($response, [
"correlation-id" => $message->getHeader("correlation-id"),
], "", $message->getHeader("reply-to"));
$this->channel->ack($message);
}
}
$client = new Client();
$client->connect();
$channel = $client->channel();
$rpcServer = new RPCServer($channel, "great_server", function ($request) {
echo "handling request: '{$request}'\n";
return $request . " is great";
});
$rpcServer->init();
$client->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment