Last active
January 6, 2022 17:23
-
-
Save derhuerst/87a32b61ea33ff047f5b2e30d1b51c36 to your computer and use it in GitHub Desktop.
persisted, idempotent & timed queue of async functions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"private": true, | |
"name": "persisted-idempotent-timed-promise-queue", | |
"description": "persisted, idempotent & timed queue of async functions", | |
"version": "2.0.0", | |
"main": "persisted-idempotent-timed-queue.js", | |
"author": "Jannis R <mail@jannisr.de>", | |
"license": "ISC", | |
"engines": { | |
"node": ">=14" | |
}, | |
"dependencies": { | |
"sorted-array-functions": "^1.3.0" | |
}, | |
"peerDependencies": { | |
"ioredis": "^4.17.3" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const {strictEqual, ok} = require('assert') | |
const {gt: greaterThan} = require('sorted-array-functions') | |
const createPersistedIdempotentTimedQueue = (processTask, onError, opt = {}) => { | |
const { | |
concurrency, | |
redis, | |
redisNs, | |
} = { | |
concurrency: 8, | |
redis: null, | |
redisNs: '', | |
...opt, | |
} | |
const tasks = [] // {id, t}, ordered by t | |
const taskIds = new Set() // task IDs, for efficient lookup | |
// currently scheduled tasks, at most `concurrency` | |
let started = true | |
const scheduled = new Map() // task -> timer | |
const compareTasksByT = (taskA, taskB) => taskA.t - taskB.t | |
const _put = (id, t, persist) => { | |
// potentially remove old task with same ID | |
if (taskIds.has(id)) { | |
const prevIdx = tasks.find(t => t.id === id) | |
const prevTask = tasks[prevIdx] | |
// remove old task | |
unschedule(prevTask) | |
tasks.splice(prevIdx, 1) | |
out.size-- | |
} | |
// add new task | |
const task = {id, t} | |
let idx = greaterThan(tasks, task, compareTasksByT) | |
if (idx === -1) { | |
idx = tasks.length | |
tasks.push(task) | |
} else { | |
tasks.splice(idx, 0, task) | |
} | |
out.size++ | |
// persist task in Redis | |
if (persist && redis) { | |
redis.set(redisNs + id, t + '') | |
.catch(onError) | |
} | |
} | |
const put = (id, when, absolute = false, persist = true, schedule = true) => { | |
if (process.env.NODE_ENV !== 'production') { | |
strictEqual(typeof id, 'string', 'id must be a string') | |
ok(id, 'id must not be empty') | |
strictEqual(typeof when, 'number', 'when must be a number') | |
ok(when >= (absolute ? Date.now() : 0), 'when must be in the future') | |
} | |
const t = absolute ? when : Date.now() + when | |
_put(id, t, persist) | |
if (schedule) scheduleEnough() | |
} | |
const unschedule = (task) => { | |
if (!scheduled.has(task)) return; | |
const timer = scheduled.get(task) | |
clearTimeout(timer) | |
scheduled.delete(task) | |
} | |
const scheduleEnough = () => { | |
if (!started) return; | |
for (let i = 0; i < tasks.length && scheduled.size < concurrency; i++) { | |
const task = tasks[i] | |
if (scheduled.has(task)) continue | |
const ms = task.t - Date.now() | |
const timer = setTimeout(runTask, ms, task) | |
scheduled.set(task, timer) | |
} | |
} | |
const runTask = async (task) => { | |
const {id, t} = task | |
await processTask(id, t, out.size) | |
// remove timer & task | |
scheduled.delete(task) | |
tasks.splice(tasks.indexOf(task), 1) | |
out.size-- | |
redis.del(redisNs + id) | |
.catch(onError) | |
if (started && out.size > 0) scheduleEnough() | |
} | |
const start = () => { | |
if (started) return; | |
started = true | |
scheduleEnough() | |
} | |
const stop = () => { | |
if (!started) return; | |
started = false | |
for (const timer of scheduled.values()) { | |
clearTimeout(timer) | |
} | |
scheduled.clear() | |
} | |
const readPersistedTasks = async () => { | |
let cursor = '0' | |
while (true) { // eslint-disable-line no-constant-condition | |
const [ | |
newCursor, keys, | |
] = await redis.scan(cursor, 'COUNT', 300, 'MATCH', redisNs + '*') | |
cursor = newCursor | |
const op = redis.multi() | |
for (const key of keys) op.get(key) | |
const res = await op.exec() | |
for (let i = 0; i < res.length; i++) { | |
const [err, _t] = res[i] | |
if (err) throw err | |
const id = keys[i].slice(redisNs.length) | |
const t = parseInt(_t) | |
_put(id, t, false) | |
} | |
if (cursor === '0') break | |
} | |
scheduleEnough() | |
} | |
if (redis) { | |
readPersistedTasks().catch(onError) | |
} | |
// todo: del(id) | |
// todo: clearAll() | |
const out = { | |
size: 0, | |
put, | |
start, stop, | |
} | |
return out | |
} | |
module.exports = createPersistedIdempotentTimedQueue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment