Skip to content

Instantly share code, notes, and snippets.

@shasharoman
Created June 25, 2018 10:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shasharoman/2b3255e0d2c663a58a874b92a5da6fc5 to your computer and use it in GitHub Desktop.
Save shasharoman/2b3255e0d2c663a58a874b92a5da6fc5 to your computer and use it in GitHub Desktop.
core redis-queue logic
const Promise = require('bluebird');
const _ = require('lodash');
const uuid = require('uuid');
const C = require('../constant');
const proxy = require('../proxy');
const lib = require(process.env.lib);
const redis = lib.redis.createClient(C.QUEUE.REDIS);
const logger = lib.logger.createLogger('common-service-queue');
exports.pushTask = pushTask;
exports.onTopic = onTopic;
exports.scanDelayInterval = scanDelayInterval;
exports.cleanBufferInverval = cleanBufferInverval;
function pushTask(id, topic, delay, ttr, body) {
id = (id || uuid.v4()).toString();
topic = (topic || 'default').toString();
delay = Number(delay) || 30;
ttr = Number(ttr) || 30;
let task = {
id: id,
topic: topic,
timestamp: Date.now() + delay * 1000,
ttr: ttr * 1000,
body: body
};
return redis.multi()
.hset(C.QUEUE.REDIS_PREFIX.POOL, id, JSON.stringify(task))
.hset(C.QUEUE.REDIS_PREFIX.STATUS, id, C.QUEUE.TASK_STATUS.DELAY)
.hset(C.QUEUE.REDIS_PREFIX.TOPIC, id, topic)
.zadd(C.QUEUE.REDIS_PREFIX.DELAY, task.timestamp, id)
.execAsync();
}
function onTopic(topic, fn, frequency) {
return popTask(topic).then(ret => {
if (_.isEmpty(ret)) {
return Promise.reject({
isBreak: true,
result: {}
});
}
return fn(ret).then(() => finishTask(ret.id));
}).catch(err => {
if (err && err.isBreak) {
return Promise.resolve(err.result);
}
logger.error(err);
return Promise.resolve();
}).finally(() => {
setTimeout(onTopic, frequency, topic, fn, frequency);
});
}
function scanDelayInterval(interval) {
scanDelay().catch(err => {
logger.error(err);
}).finally(() => {
setTimeout(scanDelayInterval, interval, interval);
});
}
function cleanBufferInverval(interval) {
cleanBuffer().catch(err => {
logger.error(err);
}).finally(() => {
setTimeout(cleanBufferInverval, interval, interval);
});
}
function scanDelay() {
let now = Date.now();
return redis.evalAsync(C.QUEUE.PICK_SCRIPT, 2, 'begin', 'end', now - 3600 * 1000, now);
}
function popTask(topic) {
let key = `${C.QUEUE.REDIS_PREFIX.READY}:${topic}`;
return redis.rpoplpushAsync(key, C.QUEUE.REDIS_PREFIX.BUFFER).then(ret => {
if (_.isEmpty(ret)) {
return Promise.reject({
isBreak: true
});
}
return taskById(ret);
}).then(ret => {
if (ret.status !== C.QUEUE.TASK_STATUS.READY) {
return Promise.reject({
isBreak: true
});
}
return delayTask(Date.now() + ret.ttr, ret.id).then(() => ret);
}).catch(err => {
if (err && err.isBreak) {
return Promise.resolve(err.result);
}
return Promise.reject(err);
});
}
function cleanBuffer() {
return redis.evalAsync(C.QUEUE.CLEAN_SCRIPT, 0);
}
function finishTask(id) {
return redis.hsetAsync(C.QUEUE.REDIS_PREFIX.STATUS, id, C.QUEUE.TASK_STATUS.OK).then(() => {
return taskById(id);
}).then(ret => {
if (_.isEmpty(ret)) {
return Promise.reject({
isBreak: true
});
}
return proxy.queue.insertFinishTask({
taskId: ret.id,
topic: ret.topic,
timestamp: new Date(ret.timestamp),
ttr: ret.ttr,
body: JSON.stringify(ret.body),
status: ret.status
});
}).then(() => {
return redis.multi()
.hdel(C.QUEUE.REDIS_PREFIX.STATUS, id)
.hdel(C.QUEUE.REDIS_PREFIX.TOPIC, id)
.hdel(C.QUEUE.REDIS_PREFIX.POOL, id)
.zrem(C.QUEUE.REDIS_PREFIX.DELAY, id)
.execAsync();
}).catch(err => {
if (err && err.isBreak) {
return Promise.resolve(err.result);
}
return Promise.reject(err);
});
}
function delayTask(timestamp, id) {
return redis.multi()
.zadd(C.QUEUE.REDIS_PREFIX.DELAY, timestamp, id)
.hset(C.QUEUE.REDIS_PREFIX.STATUS, id, C.QUEUE.TASK_STATUS.DELAY)
.execAsync();
}
function taskById(id) {
return redis.multi()
.hget(C.QUEUE.REDIS_PREFIX.STATUS, id)
.hget(C.QUEUE.REDIS_PREFIX.POOL, id)
.execAsync().then(ret => {
if (_.isEmpty(ret[0]) || _.isEmpty(ret[1])) {
return Promise.resolve({});
}
return Promise.resolve(_.assign({
status: ret[0]
}, JSON.parse(ret[1])));
});
}
/* C.QUEUE.PICK_SCRIPT: lua script
local target = redis.call("zrangebyscore", "common:queue:delay", ARGV[1], ARGV[2], "limit", 0, 1)[1]
if target == nil then
return nil
end
local status = redis.call("hget", "common:queue:status", target)
if status == "delay" then
local topic = redis.call("hget", "common:queue:topic", target)
if not topic then
topic = "default"
end
redis.call("lpush", "common:queue:ready:"..topic, target)
redis.call("hset", "common:queue:status", target, "ready")
redis.call("zrem", "common:queue:delay", target)
return target
end
if status == "ok" then
redis.call("zrem", "common:queue:delay", target)
end
return nil
*/
/* C.QUEUE.CLEAN_SCRIPT: lua script
if redis.call("llen", "common:queue:buffer") == 0 then
return nil
end
local last = redis.call("lindex", "common:queue:buffer", -1)
if redis.call("hexists", "common:queue:pool", last) == 0 then
return redis.call("rpop", "common:queue:buffer")
end
if redis.call("hget", "common:queue:status", last) == "ok" then
return redis.call("rpop", "common:queue:buffer")
end
return redis.call("rpoplpush", "common:queue:buffer", "common:queue:buffer")
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment