Created
June 2, 2015 21:00
-
-
Save jakubkulhan/d761346f29cf20d1677f to your computer and use it in GitHub Desktop.
BunnyPHP RPC example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Example; | |
use Bunny\Channel; | |
use Bunny\Client; | |
use Bunny\Message; | |
use React\Promise\Deferred; | |
use React\Promise\PromiseInterface; | |
require_once __DIR__ . "/vendor/autoload.php"; | |
class RPCClient | |
{ | |
/** @var Channel */ | |
private $channel; | |
/** @var string */ | |
private $serverQueue; | |
/** @var string */ | |
private $callbackQueue; | |
/** @var Deferred[] */ | |
private $callbacks = []; | |
public function __construct(Channel $channel, $serverQueue) | |
{ | |
$this->channel = $channel; | |
$this->serverQueue = $serverQueue; | |
} | |
public function init() | |
{ | |
$this->channel->queueDeclare($this->serverQueue, false, true); | |
$callbackQueue = $this->channel->queueDeclare("", false, false, true); | |
$this->callbackQueue = $callbackQueue->queue; | |
$this->channel->consume(function (Message $message) { | |
$this->handleMessage($message); | |
}, $this->callbackQueue); | |
} | |
private function handleMessage(Message $message) | |
{ | |
$correlationId = $message->getHeader("correlation-id"); | |
if (!isset($correlationId)) { | |
throw new \RuntimeException("Server returned response without correlation-id."); | |
} | |
if (!isset($this->callbacks[$correlationId])) { | |
throw new \RuntimeException("Server responded with unknown correlation-id: '{$correlationId}'."); | |
} | |
$this->callbacks[$correlationId]->resolve($message->content); | |
$this->channel->ack($message); | |
} | |
/** | |
* @param string $body | |
* @return PromiseInterface | |
*/ | |
public function call($body) | |
{ | |
$correlationId = uniqid("", true); | |
$this->channel->publish($body, [ | |
"correlation-id" => $correlationId, | |
"reply-to" => $this->callbackQueue, | |
], "", $this->serverQueue); | |
$this->callbacks[$correlationId] = new Deferred(); | |
return $this->callbacks[$correlationId]->promise(); | |
} | |
} | |
$client = new Client(); | |
$client->connect(); | |
$channel = $client->channel(); | |
$rpcClient = new RPCClient($channel, "great_server"); | |
$rpcClient->init(); | |
$rpcClient->call($_SERVER["argv"][1])->then(function ($response) use ($client) { | |
echo "server responded with: '{$response}'\n"; | |
$client->stop(); | |
}); | |
$client->run(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Example; | |
use Bunny\Channel; | |
use Bunny\Client; | |
use Bunny\Message; | |
require_once __DIR__ . "/vendor/autoload.php"; | |
class RPCServer | |
{ | |
/** @var Channel */ | |
private $channel; | |
/** @var string */ | |
private $serverQueue; | |
/** @var callable */ | |
private $handler; | |
public function __construct(Channel $channel, $serverQueue, callable $handler) | |
{ | |
$this->channel = $channel; | |
$this->serverQueue = $serverQueue; | |
$this->handler = $handler; | |
} | |
public function init() | |
{ | |
$this->channel->queueDeclare($this->serverQueue, false, true); | |
$this->channel->consume(function (Message $message) { | |
$this->handleMessage($message); | |
}, $this->serverQueue); | |
} | |
private function handleMessage(Message $message) | |
{ | |
try { | |
$handler = $this->handler; | |
$response = $handler($message->content); | |
} catch (\Exception $e) { | |
$response = ""; | |
} | |
$this->channel->publish($response, [ | |
"correlation-id" => $message->getHeader("correlation-id"), | |
], "", $message->getHeader("reply-to")); | |
$this->channel->ack($message); | |
} | |
} | |
$client = new Client(); | |
$client->connect(); | |
$channel = $client->channel(); | |
$rpcServer = new RPCServer($channel, "great_server", function ($request) { | |
echo "handling request: '{$request}'\n"; | |
return $request . " is great"; | |
}); | |
$rpcServer->init(); | |
$client->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment