Skip to content

Instantly share code, notes, and snippets.

@leeyisoft
Last active August 23, 2017 01:30
Show Gist options
  • Save leeyisoft/7d1098097a04cfeaad32429036d29450 to your computer and use it in GitHub Desktop.
Save leeyisoft/7d1098097a04cfeaad32429036d29450 to your computer and use it in GitHub Desktop.
PHP rabbitmq producer for yii2
<?php
/**
* Advanced Message Queuing Protocol 发送消息类
*
* @copyright leeyi.net @ 2014-2020
* @author leeyi <leeyisoft@icloud.com>
*/
namespace app\lib\AMQP;
class Pusher {
static public $msg_check_key_prefix = 'push_to_mq:';
static private $error = '';
/**
* 获取最近一次错误
*/
static public function getError() {
return self::$error;
}
/**
* 短信 发送MQ静态方法
*
* @param string $moible 接受短信的手机号码
* @param string $sms_msg 短信内容,内容格式为,字符串,或者json {"temp_name":"temp_name","params":{"key1":"val1","key2":"val2"}}
* @param string $msg_platform 消息平台,指定用哪个平台发送消息
* @param int $related_uid 关联业务UID
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性
* @param datetime $msg_datetime 定时时间,为空时表示立即发送(选填) 格式:yyyy-MM-dd HH:mm:ss
* @param string $related_table 关联数据库表名称
*
* @access public
* @author leeyi <leeyisoft@icloud.com>
* @return string
*/
static public function pushSMSToMQ($mobile=''
, $sms_msg=''
, $msg_platform=''
, $related_uid=0
, $related_row_id=0
, $msg_datetime=''
, $related_table=''
) {
self::$error = '';
if(
!self::isMobile($mobile)
|| empty($sms_msg)
|| empty($msg_platform)
|| $related_uid<0
|| $related_row_id<0
) {
self::$error = '参数不合法';
return false;
}
$msg = [];
$msg['msg_platform'] = $msg_platform;
$msg['msg_datetime'] = $msg_datetime;
$msg['mobile'] = $mobile;
$msg['related_uid'] = intval($related_uid);
$msg['related_row_id'] = intval($related_row_id);
$msg['related_table'] = strval($related_table);
$msg_md5 = md5(json_encode($msg));
// sms_msg 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化
$msg['sms_msg'] = $sms_msg;
$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg];
$res = self::checkMsg($msg_md5);
if( $res ) {
self::$error = '已经发送过了,请不要重复请求';
return false;
}
$mq = new Rabbit();
$res = $mq->sending($queue_name='push.sms', $message, $check_rule_time=time() );
if( $res ) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到
//第二个参数即是我们要缓存的数据
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0
\Yii::$app->cache->set($cache_key, 1, 86400);
}
return $res;
}
/**
* Email 发送MQ静态方法
*
* @param string $to_email 收件人Email地址
* @param string $name 收件人称呼
* @param string $msg_title 消息标题
* @param string $msg_body 消息内容
* @param int $related_uid 关联业务UID
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性
*
* @access public
* @author leeyi <leeyisoft@icloud.com>
* @return string
*/
static public function pushEmailToMQ($to_email
, $name
, $msg_title
, $msg_body
, $related_uid=0
, $related_row_id=0
, $related_table=''
) {
self::$error = '';
if(
!self::isEmail($to_email)
|| empty($msg_title)
|| empty($msg_body)
|| $related_uid<0
|| $related_row_id<0
) {
self::$error = '参数不合法';
return false;
}
$msg = [];
$msg['to_email'] = $to_email;
$msg['name'] = $name;
$msg['msg_title'] = $msg_title;
$msg['related_uid'] = intval($related_uid);
$msg['related_row_id'] = intval($related_row_id);
$msg['related_table'] = strval($related_table);
$msg['msg_platform'] = 'email';
$msg_md5 = md5(json_encode($msg));
// msg_body 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化
$msg['msg_body'] = $msg_body;
$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg];
$res = self::checkMsg($msg_md5);
if( $res ) {
self::$error = '已经发送过了,请不要重复请求';
return false;
}
$mq = new Rabbit();
$res = $mq->sending($queue_name='push.email', $message, $check_rule_time=time() );
if( $res ) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到
//第二个参数即是我们要缓存的数据
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0
\Yii::$app->cache->set($cache_key, 1, 86400);
}
return $res;
}
/**
* Jpush 发送MQ静态方法
*
* @param string $title 推送内容标题
* @param string $content 推送内容
* @param array $related_uids 关联业务UID 没有需要设置为 array(0)
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性
*
* @access public
* @author leeyi <leeyisoft@icloud.com>
* @return string
*/
static public function jpushToMQ($title
, $content
, $related_row_id=0
, $related_uids=[]
, $platform='afd'
) {
self::$error = '';
if(
empty($content)
|| !is_array($related_uids)
|| $related_row_id<0
) {
self::$error = '参数不合法';
return false;
}
$msg = [];
$msg['title'] = $title;
$msg['related_uids'] = $related_uids;
$msg['related_row_id'] = intval($related_row_id);
$msg['platform'] = $platform;
$msg_md5 = md5(json_encode($msg));
// content 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化
$msg['content'] = $content;
$msg['msg_platform'] = 'jpush';
$message = array('msg_md5' => $msg_md5, 'msg' => $msg);
$res = self::checkMsg($msg_md5);
if( $res ) {
self::$error = '已经发送过了,请不要重复请求';
return false;
}
$mq = new Rabbit();
$res = $mq->sending($queue_name='push.jpush', $message, $check_rule_time=time() );
if( $res ) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到
//第二个参数即是我们要缓存的数据
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0
\Yii::$app->cache->set($cache_key, 1, 86400);
}
return $res;
}
/**
* 检查消息是否发送过
* @param string $msg_md5 32位 md5,消息唯一标示
*
* @access private
* @author leeyi <leeyisoft@icloud.com>
* @return boolean [true|false] 返回true 标示已经发送过消息了,不需要在发送了
*/
static private function checkMsg($msg_md5) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
$count = \Yii::$app->cache->get($cache_key);
return $count>0 ? true : false;
}
static private function isMobile($mobile) {
return preg_match("/^1\d{10}$/", $mobile);
}
static private function isEmail($email) {
$pattern = "/^([0-9A-Za-z\\-_\\.]+)@([0-9a-z]+\\.[a-z]{2,3}(\\.[a-z]{2})?)$/i";
return preg_match($pattern, $email);
}
}
<?php
/**
* @desc Advanced Message Queuing Protocol 发送消息类
*
* @name Rabbit
* @copyright leeyi.net @ 2014-2020
* @author leeyi <leeyisoft@icloud.com>
*/
namespace app\lib\AMQP;
/**
* 检查环境是否支持
*/
extension_loaded("amqp") or die("not extension amqp!");
class Rabbit {
/**
* rabbitmq链接
* @name $connection
*/
public $connection=false;
public $channel=false;
public $error = '';
/**
* 构造函数,初始化数据库链接等
* @author leeyi <leeyisoft@icloud.com>
*/
public function __construct($options=array()) {
$amqp = \Yii::$app->params['amqp'];
if( empty($amqp) ) {
exit('not config rabbitmq');
}
$this->connection = $this->connect($options);
$this->channel = new \AMQPChannel($this->connection);
}
/**
* 获取最近一次错误
*/
public function getError() {
return $this->error;
}
/**
* 持久化的消息发送
* @access public
* @param string $queue_name 触发点标识(这里作为队列名称)
* @param array $message 消息主体,一个多维数组,只包含元素 msg_md5 和 msg 两个元素,msg 可以是任意数组
* @param int $check_rule_time 触发点检查“奖励规则的当前时间“,作为 mq queue 的 timestamp attr
* @return true
* @author leeyi <leeyisoft@icloud.com>
*/
public function sending($queue_name, $message, $check_rule_time) {
/**
* 下面的 message 必须是 包含 msg_md5 和 msg 元素的数组,不然后端程序不会处理该消息
* msg_md5 消息身份标识
*/
if(
!isset($message['msg_md5']) ||
empty($message['msg_md5']) ||
!isset($message['msg']) ||
$check_rule_time<1
) {
return false;
}
$direct_exchange_name = 'direct.exchange.'.$queue_name;
$routingkey_name = 'routingkey.'.$queue_name;
$queue_name = 'queue.'.$queue_name;
//创建exchange名称和类型
$exchange = new \AMQPExchange($this->channel);
$exchange->setName($direct_exchange_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); // 持久化
$exchange->declareExchange();
//创建queue名称,使用exchange,绑定routingkey
$queue = new \AMQPQueue($this->channel);
$queue->setName($queue_name);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($direct_exchange_name, $routingkey_name);
$attrs = [];
$attrs['content_type'] = 'application/json';
$attrs['content_encoding'] = 'utf-8';
$attrs['timestamp'] = $check_rule_time;
// $attrs['delivery_mode'] = '2'; // 持久化消息
// 消息发布
// $flag=AMQP_MANDATORY 在发布消息时,消息必须被路由到一个有效的队列中。如果不是,将返回一个错误。
$this->channel->startTransaction();
$exchange->publish(json_encode($message), $routingkey_name, $flag=AMQP_MANDATORY, $attrs);
$this->channel->commitTransaction();
return true;
}
/**
* 链接rabbitmq 只运行在够着函数使用
* @param $options['host'] string 服务地址
* @param $options['port'] int 端口
* @param $options['login'] string 登录账号
* @param $options['password'] string 账号密码
* @param $options['vhost'] string 虚拟站点
* @access protected
* @author leeyi <leeyisoft@icloud.com>
* @return $connect
*/
protected function connect($options=[]) {
//连接RabbitMQ
$amqp = \Yii::$app->params['amqp'];
$options = array_merge( [
'host'=>$amqp['host'] ,
'port'=> $amqp['port'],
'vhost'=>$amqp['vhost'],
'login'=>$amqp['user'],
'password'=>$amqp['password'],
], (array)$options);
$connection = new \AMQPConnection($options);
$connection->connect();
return $connection;
}
/**
* 析构函数
* @access public
*/
public function __destruct() {
// 断开链接
$this->connection && $this->connection->disconnect();
}
}
<?php
/**
* ./yii test/rpc
* ./yii test/sms
* ./yii test/email
*/
namespace app\commands;
use app\lib\AMQP\Pusher;
use app\lib\AMQP\RpcClient;
class TestController extends Controller {
public function actionIndex() {
phpinfo();die;
echo "console test/index!\n";
return 0;
}
public function actionRpc() {
$_rpc = new RpcClient();
$response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'send/buddy/翠翠/西瓜吃\n🍉了']);
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'list/no_exit']);
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'list/group']);
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'send/buddy/2651857907/用rabbitmq de RpcClient fasong de 数据']);
// $response = $_rpc->call($service='qqbot', ['exp'=>'fib(20)']);
// $response = $_rpc->call($service='qqbot', ['exp'=>'1+2']);
// $response = $_rpc->call($service='qqbot', ['exp'=>'abc(20)']);
var_dump(json_decode($response, true));
}
public function actionEmail() {
$msg_body = '<div style="font-size: 17px;color: #333;font-family:\'Microsoft Yahei\';"> <p>文|好孕姐&nbsp;</p><p>昨天,肚子里宝宝的反应太可爱了,先是用力动了好几下,可能是在提示我们赶紧休战。后来发现没用,居然用小脚丫连踢带踹的,肚皮上一瞬间出现好几处凸起。我赶紧跟她说妈妈没骂你,是骂爸爸呢,她就不动了。</p><p><img src="https://rescdn.qqmail.com/dyimg/20170820/7DED47BA699.jpeg" style="width: 686px; height: auto;"></p><p>哈哈,好聪明的宝贝呀。难怪最近好多孕妈妈说只要和老公吵架,胎动就特别频繁。原来宝宝在肚子里的时候就能感知到妈妈的情绪,也会小小地担心妈妈会训他呢。&nbsp;</p><p>那么问题来了,孕期经常吵架对胎宝宝的影响有多大呢?</p><p><img src="https://rescdn.qqmail.com/dyimg/20170820/76B3904D630.jpeg" style="width: 686px; height: auto;"></p><p>一.影响胎儿发育&nbsp;</p><p>怀孕期间,孕妈妈的情绪起伏比较大,本来就很容易生气、发脾气。如果老公还不知道体贴照顾,夫妻两个很容易发生激烈的争吵。我们知道当人处于极端愤怒的状态时,血管会出现收缩,血流加快,内分泌也会发生变化,分泌一些有害的激素。这些变化都会直接危害到胎儿的发育。&nbsp;</p><p>还有一点需要意识到的是吵架时,说话声音一般较大,胎宝宝会感觉到害怕。</p><p><img src="https://rescdn.qqmail.com/dyimg/20170820/706AABC0F1B.jpeg" style="width: 686px; height: auto;"></p><p>二.出生后容易出现心理缺陷&nbsp;</p><p>研究发现,怀孕期间,如果夫妻两个的感情不好,经常吵架打架,孩子出生后很容易存在身心缺陷。不仅发育缓慢,而且会由于恐惧心理导致神经质,性格方面也不太好,遇到事情后比较容易胆小怯弱,生活独立性差。&nbsp;</p><p>所以不管是为了孕妈妈,还是为了孩子,怀孕期间都应该尽量减少争执。孕妈要学会控制自己的情绪,宝爸也要多体贴一点,懂得分担孕妈妈的痛苦和压力。当出现矛盾的时候,要理智地分析,互相理解。</p><p>-----------------------</p><p>怎么备孕成功率高?如何缓解孕吐?孕期可以xxoo吗?孕期感冒、发烧要不要吃药?预产期到了还没动静,闹哪样?月子里的那些规矩有没有科学依据?</p><p>如果你有这些困惑,</p></div>';
$res = Pusher::pushEmailToMQ($to_email="leeyisoft@163.com"
, $name=''
, $msg_title='这是一份测试邮件'
, $msg_body=$msg_body
, $related_uid=0
, $related_row_id=time()
, $related_table=''
);
var_dump($res);
var_dump(Pusher::getError());
return 0;
}
public function actionSms() {
// var_dump(\Yii::$app->params['amqp']); exit();
echo "console test/amqp!\n";
$res = Pusher::pushSMSToMQ($mobile='13692177080'
// , $sms_msg='【运呗】验证码1234,您正在注册成为ooo用户,感谢您的支持!'
// , $msg_platform='clysms'
, $sms_msg='【运呗】aseqwrqwerqwe成立业短信平台 2.2.个性短信接口测试,感谢您的支持! 测试'
, $msg_platform='clyafd_sale'
, $related_uid=1
, $related_row_id=time()
, $msg_datetime=''
, $related_table=''
);
var_dump($res);
var_dump(Pusher::getError());
return 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment