Created
February 9, 2018 23:35
-
-
Save kendru/d393ea1ac8a95ebf2978fa07c5c4231f to your computer and use it in GitHub Desktop.
Asynchronous work queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const promise_ = Symbol('promise'); | |
class Deferred { | |
constructor() { | |
this[promise_] = new Promise((resolve, reject) => { | |
this.resolve = resolve; | |
this.reject = reject; | |
}); | |
} | |
get promise() { | |
return this[promise_]; | |
} | |
} | |
function delay(ms) { | |
return new Promise(resolve => setTimeout(resolve, ms)); | |
} | |
// A task is essentially a deferred that can contain a payload that the task | |
function task(type, payload) { | |
const d = new Deferred(); | |
return Object.assign(d, { type, payload }); | |
} | |
// Creates a queue that can | |
function queue() { | |
const _q = []; | |
const failed = []; | |
return { | |
add(t) { | |
_q.push(t); | |
}, | |
peek() { | |
return _q[0]; | |
}, | |
remove() { | |
return _q.shift(); | |
}, | |
nack(task) { | |
failed.push({ task, ts: Date.now() }); | |
}, | |
failures() { | |
return failed; | |
} | |
}; | |
} | |
function workLoop(queue, processors = []) { | |
const procsByType = processors.reduce((idx, proc) => { | |
if (!idx[proc.type]) { | |
idx[proc.type] = []; | |
} | |
idx[proc.type].push(proc); | |
return idx; | |
}, {}); | |
async function go() { | |
let next; | |
while (task = queue.remove()) { | |
const { type, payload, resolve, reject } = task; | |
if (!procsByType[type]) { | |
queue.nack(task); | |
continue; | |
} | |
try { | |
const res = await Promise.all(procsByType[type].map(proc => proc.process(payload))); | |
resolve(res); | |
} catch (err) { | |
reject(err); | |
} | |
} | |
setTimeout(go, 100); | |
} | |
return { go } | |
}; | |
function processor(type, process) { | |
return { type, process }; | |
} | |
function waitProcessor(mult = 1) { | |
return processor('delay', async function process(payload) { | |
await delay(payload.delay * mult); | |
return Date.now(); | |
}); | |
} | |
let t1 = task('delay', { delay: 1000 }); | |
let t2 = task('delay', { delay: 6000 }); | |
let t3 = task('delay', { delay: 500 }); | |
let t4 = task('delay', { delay: 13000 }); | |
t1.promise.then(res => console.log('Finished task 1', res)); | |
t2.promise.then(res => console.log('Finished task 2', res)); | |
t3.promise.then(res => console.log('Finished task 3', res)); | |
t4.promise.then(res => console.log('Finished task 4', res)); | |
let q = queue(); | |
let p = waitProcessor(); | |
let p1 = waitProcessor(1.5); | |
let w = workLoop(q, [p, p1]); | |
q.add(t1); | |
q.add(t2); | |
w.go(); | |
q.add(t3); | |
q.add(t4); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment