Skip to content

Instantly share code, notes, and snippets.

@derhuerst
Last active January 6, 2022 17:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save derhuerst/87a32b61ea33ff047f5b2e30d1b51c36 to your computer and use it in GitHub Desktop.
Save derhuerst/87a32b61ea33ff047f5b2e30d1b51c36 to your computer and use it in GitHub Desktop.
persisted, idempotent & timed queue of async functions
{
"private": true,
"name": "persisted-idempotent-timed-promise-queue",
"description": "persisted, idempotent & timed queue of async functions",
"version": "2.0.0",
"main": "persisted-idempotent-timed-queue.js",
"author": "Jannis R <mail@jannisr.de>",
"license": "ISC",
"engines": {
"node": ">=14"
},
"dependencies": {
"sorted-array-functions": "^1.3.0"
},
"peerDependencies": {
"ioredis": "^4.17.3"
}
}
'use strict'
const {strictEqual, ok} = require('assert')
const {gt: greaterThan} = require('sorted-array-functions')
const createPersistedIdempotentTimedQueue = (processTask, onError, opt = {}) => {
const {
concurrency,
redis,
redisNs,
} = {
concurrency: 8,
redis: null,
redisNs: '',
...opt,
}
const tasks = [] // {id, t}, ordered by t
const taskIds = new Set() // task IDs, for efficient lookup
// currently scheduled tasks, at most `concurrency`
let started = true
const scheduled = new Map() // task -> timer
const compareTasksByT = (taskA, taskB) => taskA.t - taskB.t
const _put = (id, t, persist) => {
// potentially remove old task with same ID
if (taskIds.has(id)) {
const prevIdx = tasks.find(t => t.id === id)
const prevTask = tasks[prevIdx]
// remove old task
unschedule(prevTask)
tasks.splice(prevIdx, 1)
out.size--
}
// add new task
const task = {id, t}
let idx = greaterThan(tasks, task, compareTasksByT)
if (idx === -1) {
idx = tasks.length
tasks.push(task)
} else {
tasks.splice(idx, 0, task)
}
out.size++
// persist task in Redis
if (persist && redis) {
redis.set(redisNs + id, t + '')
.catch(onError)
}
}
const put = (id, when, absolute = false, persist = true, schedule = true) => {
if (process.env.NODE_ENV !== 'production') {
strictEqual(typeof id, 'string', 'id must be a string')
ok(id, 'id must not be empty')
strictEqual(typeof when, 'number', 'when must be a number')
ok(when >= (absolute ? Date.now() : 0), 'when must be in the future')
}
const t = absolute ? when : Date.now() + when
_put(id, t, persist)
if (schedule) scheduleEnough()
}
const unschedule = (task) => {
if (!scheduled.has(task)) return;
const timer = scheduled.get(task)
clearTimeout(timer)
scheduled.delete(task)
}
const scheduleEnough = () => {
if (!started) return;
for (let i = 0; i < tasks.length && scheduled.size < concurrency; i++) {
const task = tasks[i]
if (scheduled.has(task)) continue
const ms = task.t - Date.now()
const timer = setTimeout(runTask, ms, task)
scheduled.set(task, timer)
}
}
const runTask = async (task) => {
const {id, t} = task
await processTask(id, t, out.size)
// remove timer & task
scheduled.delete(task)
tasks.splice(tasks.indexOf(task), 1)
out.size--
redis.del(redisNs + id)
.catch(onError)
if (started && out.size > 0) scheduleEnough()
}
const start = () => {
if (started) return;
started = true
scheduleEnough()
}
const stop = () => {
if (!started) return;
started = false
for (const timer of scheduled.values()) {
clearTimeout(timer)
}
scheduled.clear()
}
const readPersistedTasks = async () => {
let cursor = '0'
while (true) { // eslint-disable-line no-constant-condition
const [
newCursor, keys,
] = await redis.scan(cursor, 'COUNT', 300, 'MATCH', redisNs + '*')
cursor = newCursor
const op = redis.multi()
for (const key of keys) op.get(key)
const res = await op.exec()
for (let i = 0; i < res.length; i++) {
const [err, _t] = res[i]
if (err) throw err
const id = keys[i].slice(redisNs.length)
const t = parseInt(_t)
_put(id, t, false)
}
if (cursor === '0') break
}
scheduleEnough()
}
if (redis) {
readPersistedTasks().catch(onError)
}
// todo: del(id)
// todo: clearAll()
const out = {
size: 0,
put,
start, stop,
}
return out
}
module.exports = createPersistedIdempotentTimedQueue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment