Skip to content

Instantly share code, notes, and snippets.

@shanelau
Created July 6, 2016 06:48
Show Gist options
  • Save shanelau/a0b3e2e5cd116f16cd4ac98e08350fde to your computer and use it in GitHub Desktop.
Save shanelau/a0b3e2e5cd116f16cd4ac98e08350fde to your computer and use it in GitHub Desktop.
aliyun message queue service
var AliMNS = require("ali-mns");
var ENV = process.env;
var account = new AliMNS.Account(ENV.MQS_ACCOUNTID, ENV.MQS_KEY, ENV.MQS_SECURITY);
module.exports = {
MQActivity: new AliMNS.MQ(sails.config.mqs.queues.activity, account, ENV.MQS_REGION), // message queue client
MQElastic: new AliMNS.MQ(sails.config.mqs.queues.elastic, account, ENV.MQS_REGION) // message queue client
};
var MQSService = require('./MQSService');
/**
* 检测任务是否创建, 否则则创建
*/
function createSyncTask() {
MQSService.MQElastic.peekP(SYNC_TIME).then((tasks) => {
if (tasks) { // task build success then do nothing
return false;
}
return ;
}).then((err) => {
if (err.Error && err.Error.Code === 'MessageNotExist') {
sails.log.info('create search sync task');
MQSService.MQElastic.sendP(SYNC_TASK);
}
});
// 注册回调监听
MQSService.MQElastic.notifyRecv(function(err, message){
if(err && err.message === "NetworkBroken"){
sails.log.error('search sync error', err.stack);
return true;
}
runSync(); // 同步任务
MQSService.MQElastic.sendP(SYNC_TASK); // 任务处理完毕 ,再次注册任务
return true; // this will cause message to be deleted automatically
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment