Last active
August 23, 2017 01:30
-
-
Save leeyisoft/7d1098097a04cfeaad32429036d29450 to your computer and use it in GitHub Desktop.
PHP rabbitmq producer for yii2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Advanced Message Queuing Protocol 发送消息类 | |
* | |
* @copyright leeyi.net @ 2014-2020 | |
* @author leeyi <leeyisoft@icloud.com> | |
*/ | |
namespace app\lib\AMQP; | |
class Pusher { | |
static public $msg_check_key_prefix = 'push_to_mq:'; | |
static private $error = ''; | |
/** | |
* 获取最近一次错误 | |
*/ | |
static public function getError() { | |
return self::$error; | |
} | |
/** | |
* 短信 发送MQ静态方法 | |
* | |
* @param string $moible 接受短信的手机号码 | |
* @param string $sms_msg 短信内容,内容格式为,字符串,或者json {"temp_name":"temp_name","params":{"key1":"val1","key2":"val2"}} | |
* @param string $msg_platform 消息平台,指定用哪个平台发送消息 | |
* @param int $related_uid 关联业务UID | |
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性 | |
* @param datetime $msg_datetime 定时时间,为空时表示立即发送(选填) 格式:yyyy-MM-dd HH:mm:ss | |
* @param string $related_table 关联数据库表名称 | |
* | |
* @access public | |
* @author leeyi <leeyisoft@icloud.com> | |
* @return string | |
*/ | |
static public function pushSMSToMQ($mobile='' | |
, $sms_msg='' | |
, $msg_platform='' | |
, $related_uid=0 | |
, $related_row_id=0 | |
, $msg_datetime='' | |
, $related_table='' | |
) { | |
self::$error = ''; | |
if( | |
!self::isMobile($mobile) | |
|| empty($sms_msg) | |
|| empty($msg_platform) | |
|| $related_uid<0 | |
|| $related_row_id<0 | |
) { | |
self::$error = '参数不合法'; | |
return false; | |
} | |
$msg = []; | |
$msg['msg_platform'] = $msg_platform; | |
$msg['msg_datetime'] = $msg_datetime; | |
$msg['mobile'] = $mobile; | |
$msg['related_uid'] = intval($related_uid); | |
$msg['related_row_id'] = intval($related_row_id); | |
$msg['related_table'] = strval($related_table); | |
$msg_md5 = md5(json_encode($msg)); | |
// sms_msg 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化 | |
$msg['sms_msg'] = $sms_msg; | |
$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg]; | |
$res = self::checkMsg($msg_md5); | |
if( $res ) { | |
self::$error = '已经发送过了,请不要重复请求'; | |
return false; | |
} | |
$mq = new Rabbit(); | |
$res = $mq->sending($queue_name='push.sms', $message, $check_rule_time=time() ); | |
if( $res ) { | |
$cache_key = self::$msg_check_key_prefix.$msg_md5; | |
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到 | |
//第二个参数即是我们要缓存的数据 | |
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0 | |
\Yii::$app->cache->set($cache_key, 1, 86400); | |
} | |
return $res; | |
} | |
/** | |
* Email 发送MQ静态方法 | |
* | |
* @param string $to_email 收件人Email地址 | |
* @param string $name 收件人称呼 | |
* @param string $msg_title 消息标题 | |
* @param string $msg_body 消息内容 | |
* @param int $related_uid 关联业务UID | |
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性 | |
* | |
* @access public | |
* @author leeyi <leeyisoft@icloud.com> | |
* @return string | |
*/ | |
static public function pushEmailToMQ($to_email | |
, $name | |
, $msg_title | |
, $msg_body | |
, $related_uid=0 | |
, $related_row_id=0 | |
, $related_table='' | |
) { | |
self::$error = ''; | |
if( | |
!self::isEmail($to_email) | |
|| empty($msg_title) | |
|| empty($msg_body) | |
|| $related_uid<0 | |
|| $related_row_id<0 | |
) { | |
self::$error = '参数不合法'; | |
return false; | |
} | |
$msg = []; | |
$msg['to_email'] = $to_email; | |
$msg['name'] = $name; | |
$msg['msg_title'] = $msg_title; | |
$msg['related_uid'] = intval($related_uid); | |
$msg['related_row_id'] = intval($related_row_id); | |
$msg['related_table'] = strval($related_table); | |
$msg['msg_platform'] = 'email'; | |
$msg_md5 = md5(json_encode($msg)); | |
// msg_body 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化 | |
$msg['msg_body'] = $msg_body; | |
$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg]; | |
$res = self::checkMsg($msg_md5); | |
if( $res ) { | |
self::$error = '已经发送过了,请不要重复请求'; | |
return false; | |
} | |
$mq = new Rabbit(); | |
$res = $mq->sending($queue_name='push.email', $message, $check_rule_time=time() ); | |
if( $res ) { | |
$cache_key = self::$msg_check_key_prefix.$msg_md5; | |
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到 | |
//第二个参数即是我们要缓存的数据 | |
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0 | |
\Yii::$app->cache->set($cache_key, 1, 86400); | |
} | |
return $res; | |
} | |
/** | |
* Jpush 发送MQ静态方法 | |
* | |
* @param string $title 推送内容标题 | |
* @param string $content 推送内容 | |
* @param array $related_uids 关联业务UID 没有需要设置为 array(0) | |
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性 | |
* | |
* @access public | |
* @author leeyi <leeyisoft@icloud.com> | |
* @return string | |
*/ | |
static public function jpushToMQ($title | |
, $content | |
, $related_row_id=0 | |
, $related_uids=[] | |
, $platform='afd' | |
) { | |
self::$error = ''; | |
if( | |
empty($content) | |
|| !is_array($related_uids) | |
|| $related_row_id<0 | |
) { | |
self::$error = '参数不合法'; | |
return false; | |
} | |
$msg = []; | |
$msg['title'] = $title; | |
$msg['related_uids'] = $related_uids; | |
$msg['related_row_id'] = intval($related_row_id); | |
$msg['platform'] = $platform; | |
$msg_md5 = md5(json_encode($msg)); | |
// content 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化 | |
$msg['content'] = $content; | |
$msg['msg_platform'] = 'jpush'; | |
$message = array('msg_md5' => $msg_md5, 'msg' => $msg); | |
$res = self::checkMsg($msg_md5); | |
if( $res ) { | |
self::$error = '已经发送过了,请不要重复请求'; | |
return false; | |
} | |
$mq = new Rabbit(); | |
$res = $mq->sending($queue_name='push.jpush', $message, $check_rule_time=time() ); | |
if( $res ) { | |
$cache_key = self::$msg_check_key_prefix.$msg_md5; | |
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到 | |
//第二个参数即是我们要缓存的数据 | |
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0 | |
\Yii::$app->cache->set($cache_key, 1, 86400); | |
} | |
return $res; | |
} | |
/** | |
* 检查消息是否发送过 | |
* @param string $msg_md5 32位 md5,消息唯一标示 | |
* | |
* @access private | |
* @author leeyi <leeyisoft@icloud.com> | |
* @return boolean [true|false] 返回true 标示已经发送过消息了,不需要在发送了 | |
*/ | |
static private function checkMsg($msg_md5) { | |
$cache_key = self::$msg_check_key_prefix.$msg_md5; | |
$count = \Yii::$app->cache->get($cache_key); | |
return $count>0 ? true : false; | |
} | |
static private function isMobile($mobile) { | |
return preg_match("/^1\d{10}$/", $mobile); | |
} | |
static private function isEmail($email) { | |
$pattern = "/^([0-9A-Za-z\\-_\\.]+)@([0-9a-z]+\\.[a-z]{2,3}(\\.[a-z]{2})?)$/i"; | |
return preg_match($pattern, $email); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* @desc Advanced Message Queuing Protocol 发送消息类 | |
* | |
* @name Rabbit | |
* @copyright leeyi.net @ 2014-2020 | |
* @author leeyi <leeyisoft@icloud.com> | |
*/ | |
namespace app\lib\AMQP; | |
/** | |
* 检查环境是否支持 | |
*/ | |
extension_loaded("amqp") or die("not extension amqp!"); | |
class Rabbit { | |
/** | |
* rabbitmq链接 | |
* @name $connection | |
*/ | |
public $connection=false; | |
public $channel=false; | |
public $error = ''; | |
/** | |
* 构造函数,初始化数据库链接等 | |
* @author leeyi <leeyisoft@icloud.com> | |
*/ | |
public function __construct($options=array()) { | |
$amqp = \Yii::$app->params['amqp']; | |
if( empty($amqp) ) { | |
exit('not config rabbitmq'); | |
} | |
$this->connection = $this->connect($options); | |
$this->channel = new \AMQPChannel($this->connection); | |
} | |
/** | |
* 获取最近一次错误 | |
*/ | |
public function getError() { | |
return $this->error; | |
} | |
/** | |
* 持久化的消息发送 | |
* @access public | |
* @param string $queue_name 触发点标识(这里作为队列名称) | |
* @param array $message 消息主体,一个多维数组,只包含元素 msg_md5 和 msg 两个元素,msg 可以是任意数组 | |
* @param int $check_rule_time 触发点检查“奖励规则的当前时间“,作为 mq queue 的 timestamp attr | |
* @return true | |
* @author leeyi <leeyisoft@icloud.com> | |
*/ | |
public function sending($queue_name, $message, $check_rule_time) { | |
/** | |
* 下面的 message 必须是 包含 msg_md5 和 msg 元素的数组,不然后端程序不会处理该消息 | |
* msg_md5 消息身份标识 | |
*/ | |
if( | |
!isset($message['msg_md5']) || | |
empty($message['msg_md5']) || | |
!isset($message['msg']) || | |
$check_rule_time<1 | |
) { | |
return false; | |
} | |
$direct_exchange_name = 'direct.exchange.'.$queue_name; | |
$routingkey_name = 'routingkey.'.$queue_name; | |
$queue_name = 'queue.'.$queue_name; | |
//创建exchange名称和类型 | |
$exchange = new \AMQPExchange($this->channel); | |
$exchange->setName($direct_exchange_name); | |
$exchange->setType(AMQP_EX_TYPE_DIRECT); | |
$exchange->setFlags(AMQP_DURABLE); // 持久化 | |
$exchange->declareExchange(); | |
//创建queue名称,使用exchange,绑定routingkey | |
$queue = new \AMQPQueue($this->channel); | |
$queue->setName($queue_name); | |
$queue->setFlags(AMQP_DURABLE); | |
$queue->declareQueue(); | |
$queue->bind($direct_exchange_name, $routingkey_name); | |
$attrs = []; | |
$attrs['content_type'] = 'application/json'; | |
$attrs['content_encoding'] = 'utf-8'; | |
$attrs['timestamp'] = $check_rule_time; | |
// $attrs['delivery_mode'] = '2'; // 持久化消息 | |
// 消息发布 | |
// $flag=AMQP_MANDATORY 在发布消息时,消息必须被路由到一个有效的队列中。如果不是,将返回一个错误。 | |
$this->channel->startTransaction(); | |
$exchange->publish(json_encode($message), $routingkey_name, $flag=AMQP_MANDATORY, $attrs); | |
$this->channel->commitTransaction(); | |
return true; | |
} | |
/** | |
* 链接rabbitmq 只运行在够着函数使用 | |
* @param $options['host'] string 服务地址 | |
* @param $options['port'] int 端口 | |
* @param $options['login'] string 登录账号 | |
* @param $options['password'] string 账号密码 | |
* @param $options['vhost'] string 虚拟站点 | |
* @access protected | |
* @author leeyi <leeyisoft@icloud.com> | |
* @return $connect | |
*/ | |
protected function connect($options=[]) { | |
//连接RabbitMQ | |
$amqp = \Yii::$app->params['amqp']; | |
$options = array_merge( [ | |
'host'=>$amqp['host'] , | |
'port'=> $amqp['port'], | |
'vhost'=>$amqp['vhost'], | |
'login'=>$amqp['user'], | |
'password'=>$amqp['password'], | |
], (array)$options); | |
$connection = new \AMQPConnection($options); | |
$connection->connect(); | |
return $connection; | |
} | |
/** | |
* 析构函数 | |
* @access public | |
*/ | |
public function __destruct() { | |
// 断开链接 | |
$this->connection && $this->connection->disconnect(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* ./yii test/rpc | |
* ./yii test/sms | |
* ./yii test/email | |
*/ | |
namespace app\commands; | |
use app\lib\AMQP\Pusher; | |
use app\lib\AMQP\RpcClient; | |
class TestController extends Controller { | |
public function actionIndex() { | |
phpinfo();die; | |
echo "console test/index!\n"; | |
return 0; | |
} | |
public function actionRpc() { | |
$_rpc = new RpcClient(); | |
$response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'send/buddy/翠翠/西瓜吃\n🍉了']); | |
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'list/no_exit']); | |
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'list/group']); | |
// $response = $_rpc->call($service='qqbot', ['port'=>8187, 'cmd'=>'send/buddy/2651857907/用rabbitmq de RpcClient fasong de 数据']); | |
// $response = $_rpc->call($service='qqbot', ['exp'=>'fib(20)']); | |
// $response = $_rpc->call($service='qqbot', ['exp'=>'1+2']); | |
// $response = $_rpc->call($service='qqbot', ['exp'=>'abc(20)']); | |
var_dump(json_decode($response, true)); | |
} | |
public function actionEmail() { | |
$msg_body = '<div style="font-size: 17px;color: #333;font-family:\'Microsoft Yahei\';"> <p>文|好孕姐 </p><p>昨天,肚子里宝宝的反应太可爱了,先是用力动了好几下,可能是在提示我们赶紧休战。后来发现没用,居然用小脚丫连踢带踹的,肚皮上一瞬间出现好几处凸起。我赶紧跟她说妈妈没骂你,是骂爸爸呢,她就不动了。</p><p><img src="https://rescdn.qqmail.com/dyimg/20170820/7DED47BA699.jpeg" style="width: 686px; height: auto;"></p><p>哈哈,好聪明的宝贝呀。难怪最近好多孕妈妈说只要和老公吵架,胎动就特别频繁。原来宝宝在肚子里的时候就能感知到妈妈的情绪,也会小小地担心妈妈会训他呢。 </p><p>那么问题来了,孕期经常吵架对胎宝宝的影响有多大呢?</p><p><img src="https://rescdn.qqmail.com/dyimg/20170820/76B3904D630.jpeg" style="width: 686px; height: auto;"></p><p>一.影响胎儿发育 </p><p>怀孕期间,孕妈妈的情绪起伏比较大,本来就很容易生气、发脾气。如果老公还不知道体贴照顾,夫妻两个很容易发生激烈的争吵。我们知道当人处于极端愤怒的状态时,血管会出现收缩,血流加快,内分泌也会发生变化,分泌一些有害的激素。这些变化都会直接危害到胎儿的发育。 </p><p>还有一点需要意识到的是吵架时,说话声音一般较大,胎宝宝会感觉到害怕。</p><p><img src="https://rescdn.qqmail.com/dyimg/20170820/706AABC0F1B.jpeg" style="width: 686px; height: auto;"></p><p>二.出生后容易出现心理缺陷 </p><p>研究发现,怀孕期间,如果夫妻两个的感情不好,经常吵架打架,孩子出生后很容易存在身心缺陷。不仅发育缓慢,而且会由于恐惧心理导致神经质,性格方面也不太好,遇到事情后比较容易胆小怯弱,生活独立性差。 </p><p>所以不管是为了孕妈妈,还是为了孩子,怀孕期间都应该尽量减少争执。孕妈要学会控制自己的情绪,宝爸也要多体贴一点,懂得分担孕妈妈的痛苦和压力。当出现矛盾的时候,要理智地分析,互相理解。</p><p>-----------------------</p><p>怎么备孕成功率高?如何缓解孕吐?孕期可以xxoo吗?孕期感冒、发烧要不要吃药?预产期到了还没动静,闹哪样?月子里的那些规矩有没有科学依据?</p><p>如果你有这些困惑,</p></div>'; | |
$res = Pusher::pushEmailToMQ($to_email="leeyisoft@163.com" | |
, $name='' | |
, $msg_title='这是一份测试邮件' | |
, $msg_body=$msg_body | |
, $related_uid=0 | |
, $related_row_id=time() | |
, $related_table='' | |
); | |
var_dump($res); | |
var_dump(Pusher::getError()); | |
return 0; | |
} | |
public function actionSms() { | |
// var_dump(\Yii::$app->params['amqp']); exit(); | |
echo "console test/amqp!\n"; | |
$res = Pusher::pushSMSToMQ($mobile='13692177080' | |
// , $sms_msg='【运呗】验证码1234,您正在注册成为ooo用户,感谢您的支持!' | |
// , $msg_platform='clysms' | |
, $sms_msg='【运呗】aseqwrqwerqwe成立业短信平台 2.2.个性短信接口测试,感谢您的支持! 测试' | |
, $msg_platform='clyafd_sale' | |
, $related_uid=1 | |
, $related_row_id=time() | |
, $msg_datetime='' | |
, $related_table='' | |
); | |
var_dump($res); | |
var_dump(Pusher::getError()); | |
return 0; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment