core redis-queue logic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Promise = require('bluebird'); | |
const _ = require('lodash'); | |
const uuid = require('uuid'); | |
const C = require('../constant'); | |
const proxy = require('../proxy'); | |
const lib = require(process.env.lib); | |
const redis = lib.redis.createClient(C.QUEUE.REDIS); | |
const logger = lib.logger.createLogger('common-service-queue'); | |
exports.pushTask = pushTask; | |
exports.onTopic = onTopic; | |
exports.scanDelayInterval = scanDelayInterval; | |
exports.cleanBufferInverval = cleanBufferInverval; | |
function pushTask(id, topic, delay, ttr, body) { | |
id = (id || uuid.v4()).toString(); | |
topic = (topic || 'default').toString(); | |
delay = Number(delay) || 30; | |
ttr = Number(ttr) || 30; | |
let task = { | |
id: id, | |
topic: topic, | |
timestamp: Date.now() + delay * 1000, | |
ttr: ttr * 1000, | |
body: body | |
}; | |
return redis.multi() | |
.hset(C.QUEUE.REDIS_PREFIX.POOL, id, JSON.stringify(task)) | |
.hset(C.QUEUE.REDIS_PREFIX.STATUS, id, C.QUEUE.TASK_STATUS.DELAY) | |
.hset(C.QUEUE.REDIS_PREFIX.TOPIC, id, topic) | |
.zadd(C.QUEUE.REDIS_PREFIX.DELAY, task.timestamp, id) | |
.execAsync(); | |
} | |
function onTopic(topic, fn, frequency) { | |
return popTask(topic).then(ret => { | |
if (_.isEmpty(ret)) { | |
return Promise.reject({ | |
isBreak: true, | |
result: {} | |
}); | |
} | |
return fn(ret).then(() => finishTask(ret.id)); | |
}).catch(err => { | |
if (err && err.isBreak) { | |
return Promise.resolve(err.result); | |
} | |
logger.error(err); | |
return Promise.resolve(); | |
}).finally(() => { | |
setTimeout(onTopic, frequency, topic, fn, frequency); | |
}); | |
} | |
function scanDelayInterval(interval) { | |
scanDelay().catch(err => { | |
logger.error(err); | |
}).finally(() => { | |
setTimeout(scanDelayInterval, interval, interval); | |
}); | |
} | |
function cleanBufferInverval(interval) { | |
cleanBuffer().catch(err => { | |
logger.error(err); | |
}).finally(() => { | |
setTimeout(cleanBufferInverval, interval, interval); | |
}); | |
} | |
function scanDelay() { | |
let now = Date.now(); | |
return redis.evalAsync(C.QUEUE.PICK_SCRIPT, 2, 'begin', 'end', now - 3600 * 1000, now); | |
} | |
function popTask(topic) { | |
let key = `${C.QUEUE.REDIS_PREFIX.READY}:${topic}`; | |
return redis.rpoplpushAsync(key, C.QUEUE.REDIS_PREFIX.BUFFER).then(ret => { | |
if (_.isEmpty(ret)) { | |
return Promise.reject({ | |
isBreak: true | |
}); | |
} | |
return taskById(ret); | |
}).then(ret => { | |
if (ret.status !== C.QUEUE.TASK_STATUS.READY) { | |
return Promise.reject({ | |
isBreak: true | |
}); | |
} | |
return delayTask(Date.now() + ret.ttr, ret.id).then(() => ret); | |
}).catch(err => { | |
if (err && err.isBreak) { | |
return Promise.resolve(err.result); | |
} | |
return Promise.reject(err); | |
}); | |
} | |
function cleanBuffer() { | |
return redis.evalAsync(C.QUEUE.CLEAN_SCRIPT, 0); | |
} | |
function finishTask(id) { | |
return redis.hsetAsync(C.QUEUE.REDIS_PREFIX.STATUS, id, C.QUEUE.TASK_STATUS.OK).then(() => { | |
return taskById(id); | |
}).then(ret => { | |
if (_.isEmpty(ret)) { | |
return Promise.reject({ | |
isBreak: true | |
}); | |
} | |
return proxy.queue.insertFinishTask({ | |
taskId: ret.id, | |
topic: ret.topic, | |
timestamp: new Date(ret.timestamp), | |
ttr: ret.ttr, | |
body: JSON.stringify(ret.body), | |
status: ret.status | |
}); | |
}).then(() => { | |
return redis.multi() | |
.hdel(C.QUEUE.REDIS_PREFIX.STATUS, id) | |
.hdel(C.QUEUE.REDIS_PREFIX.TOPIC, id) | |
.hdel(C.QUEUE.REDIS_PREFIX.POOL, id) | |
.zrem(C.QUEUE.REDIS_PREFIX.DELAY, id) | |
.execAsync(); | |
}).catch(err => { | |
if (err && err.isBreak) { | |
return Promise.resolve(err.result); | |
} | |
return Promise.reject(err); | |
}); | |
} | |
function delayTask(timestamp, id) { | |
return redis.multi() | |
.zadd(C.QUEUE.REDIS_PREFIX.DELAY, timestamp, id) | |
.hset(C.QUEUE.REDIS_PREFIX.STATUS, id, C.QUEUE.TASK_STATUS.DELAY) | |
.execAsync(); | |
} | |
function taskById(id) { | |
return redis.multi() | |
.hget(C.QUEUE.REDIS_PREFIX.STATUS, id) | |
.hget(C.QUEUE.REDIS_PREFIX.POOL, id) | |
.execAsync().then(ret => { | |
if (_.isEmpty(ret[0]) || _.isEmpty(ret[1])) { | |
return Promise.resolve({}); | |
} | |
return Promise.resolve(_.assign({ | |
status: ret[0] | |
}, JSON.parse(ret[1]))); | |
}); | |
} | |
/* C.QUEUE.PICK_SCRIPT: lua script | |
local target = redis.call("zrangebyscore", "common:queue:delay", ARGV[1], ARGV[2], "limit", 0, 1)[1] | |
if target == nil then | |
return nil | |
end | |
local status = redis.call("hget", "common:queue:status", target) | |
if status == "delay" then | |
local topic = redis.call("hget", "common:queue:topic", target) | |
if not topic then | |
topic = "default" | |
end | |
redis.call("lpush", "common:queue:ready:"..topic, target) | |
redis.call("hset", "common:queue:status", target, "ready") | |
redis.call("zrem", "common:queue:delay", target) | |
return target | |
end | |
if status == "ok" then | |
redis.call("zrem", "common:queue:delay", target) | |
end | |
return nil | |
*/ | |
/* C.QUEUE.CLEAN_SCRIPT: lua script | |
if redis.call("llen", "common:queue:buffer") == 0 then | |
return nil | |
end | |
local last = redis.call("lindex", "common:queue:buffer", -1) | |
if redis.call("hexists", "common:queue:pool", last) == 0 then | |
return redis.call("rpop", "common:queue:buffer") | |
end | |
if redis.call("hget", "common:queue:status", last) == "ok" then | |
return redis.call("rpop", "common:queue:buffer") | |
end | |
return redis.call("rpoplpush", "common:queue:buffer", "common:queue:buffer") | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment