Skip to content

Instantly share code, notes, and snippets.

@fordnox
Created June 9, 2015 13:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fordnox/fa41e1233a207ec5416c to your computer and use it in GitHub Desktop.
Save fordnox/fa41e1233a207ec5416c to your computer and use it in GitHub Desktop.
RabbitRpc.php
<?php
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitRpc
{
const TIMEOUT = 15;
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
function __construct($config)
{
$this->connection = new AMQPConnection($config['host'], $config['port'], $config['user'], $config['pass'], $config['vhost']);
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, true);
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
array($this, 'on_response'));
}
public function on_response(AMQPMessage $rep)
{
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function callOnServer($server_id, array $params, $type = 'server')
{
$json = $this->call(json_encode($params), $type . '.' . $server_id . '.rpc');
if (is_null($json)) {
throw new Exception('#145621 Invalid response from server #' . $server_id);
}
$result = json_decode($json, 1);
if ($result['error']) {
throw new Exception($result['error']['message']);
}
return $result['result'];
}
public function call($json, $routing_key)
{
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string)$json,
array(
'content_type' => 'application/json',
'delivery_mode' => 2,
'correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', $routing_key);
while (!$this->response) {
try {
$this->channel->wait(null, false, self::TIMEOUT);
} catch (Exception $exception) {
error_log($exception->getMessage());
break;
}
}
$this->channel->close();
$this->connection->close();
return $this->response;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment