Skip to content

Instantly share code, notes, and snippets.

@leeyisoft
Last active August 31, 2017 08:18
Show Gist options
  • Save leeyisoft/dce20647985f25de54f46e90695703bc to your computer and use it in GitHub Desktop.
Save leeyisoft/dce20647985f25de54f46e90695703bc to your computer and use it in GitHub Desktop.
php rpc client (rabbitmq)
<?php
/**
* @desc Advanced Message Queuing Protocol RPC Client
*
* @name RpcClient
* @copyright leeyi.net @ 2014-2020
* @author leeyi <leeyisoft@icloud.com>
*/
namespace app\lib\AMQP;
class RpcClient extends Rabbit {
private $queue_name = 'rpc_queue';
/**
* 构造函数,初始化数据库链接等
* @author leeyi <leeyisoft@icloud.com>
*/
public function __construct($options=array()) {
$amqp = \Yii::$app->params['amqp'];
if( empty($amqp) ) {
exit('not config rabbitmq');
}
$this->connection = $this->connect($options);
$this->channel = new \AMQPChannel($this->connection);
}
public function call($service, $data) {
if (empty($service)) {
$this->error = '必须指定调用的服务';
return false;
}
if (!is_array($data)) {
$this->error = '参数必须是一个数组';
return false;
}
$data['service'] = $service;
$this->response = NULL;
$this->corrId = uniqid();
try {
//Declare an nonymous channel
$this->queue = new \AMQPQueue($this->channel);
$this->queue->setFlags(AMQP_EXCLUSIVE);
$this->queue->declareQueue();
$this->callbackQueueName = $this->queue->getName();
//Set Publish Attributes
$attributes = [];
$attributes['content_type'] = 'application/json';
$attributes['content_encoding'] = 'utf-8';
$attributes['correlation_id'] = $this->corrId;
$attributes['reply_to'] = $this->callbackQueueName;
$exchange = new \AMQPExchange($this->channel);
$exchange->publish(
json_encode($data),
$this->queue_name,
AMQP_NOPARAM,
$attributes
);
$callback = function(\AMQPEnvelope $message, \AMQPQueue $q) {
if($message->getCorrelationId() == $this->corrId) {
// echo sprintf("CorrelationID: %s",$message->getCorrelationId()), PHP_EOL;
// echo sprintf("Response: %s",$message->getBody()), PHP_EOL;
$this->response = $message->getBody();
$q->nack($message->getDeliveryTag());
return false;
}
};
$this->queue->consume($callback);
//Return RPC Results
return $this->response;
} catch(AMQPQueueException $ex) {
// print_r($ex);
$this->error = $ex;
return false;
} catch(Exception $ex) {
// print_r($ex);
$this->error = $ex;
return false;
}
}
}
<?php
/**
* ./yii test/rpc
*/
namespace app\commands;
use app\lib\AMQP\RpcClient;
class TestController extends Controller {
public function actionIndex() {
phpinfo();die;
echo "console test/index!\n";
return 0;
}
public function actionRpc() {
$_rpc = new RpcClient();
$response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'send/buddy/翠翠/西瓜吃\n🍉了']);
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'list/no_exit']);
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'list/group']);
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'send/buddy/2651857907/用rabbitmq de RpcClient fasong de 数据']);
// $response = $_rpc->call($service='qqbot', ['exp'=>'fib(20)']);
// $response = $_rpc->call($service='qqbot', ['exp'=>'1+2']);
// $response = $_rpc->call($service='qqbot', ['exp'=>'abc(20)']);
$response = $_rpc->call($service='wxpy', [
'port'=>8111
, 'cmd'=>'list/buddy'
, 'params'=>'{"keywords":"", "sex":2, "city":"深圳", "province":"广东"}'
, 'params'=>'{"keywords":"", "province":"广东"}'
]);
var_dump(json_decode($response, true));
var_dump(json_decode($response, true));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment