Skip to content

Instantly share code, notes, and snippets.

@kendru
Created February 9, 2018 23:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kendru/d393ea1ac8a95ebf2978fa07c5c4231f to your computer and use it in GitHub Desktop.
Save kendru/d393ea1ac8a95ebf2978fa07c5c4231f to your computer and use it in GitHub Desktop.
Asynchronous work queue
const promise_ = Symbol('promise');
class Deferred {
constructor() {
this[promise_] = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
get promise() {
return this[promise_];
}
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// A task is essentially a deferred that can contain a payload that the task
function task(type, payload) {
const d = new Deferred();
return Object.assign(d, { type, payload });
}
// Creates a queue that can
function queue() {
const _q = [];
const failed = [];
return {
add(t) {
_q.push(t);
},
peek() {
return _q[0];
},
remove() {
return _q.shift();
},
nack(task) {
failed.push({ task, ts: Date.now() });
},
failures() {
return failed;
}
};
}
function workLoop(queue, processors = []) {
const procsByType = processors.reduce((idx, proc) => {
if (!idx[proc.type]) {
idx[proc.type] = [];
}
idx[proc.type].push(proc);
return idx;
}, {});
async function go() {
let next;
while (task = queue.remove()) {
const { type, payload, resolve, reject } = task;
if (!procsByType[type]) {
queue.nack(task);
continue;
}
try {
const res = await Promise.all(procsByType[type].map(proc => proc.process(payload)));
resolve(res);
} catch (err) {
reject(err);
}
}
setTimeout(go, 100);
}
return { go }
};
function processor(type, process) {
return { type, process };
}
function waitProcessor(mult = 1) {
return processor('delay', async function process(payload) {
await delay(payload.delay * mult);
return Date.now();
});
}
let t1 = task('delay', { delay: 1000 });
let t2 = task('delay', { delay: 6000 });
let t3 = task('delay', { delay: 500 });
let t4 = task('delay', { delay: 13000 });
t1.promise.then(res => console.log('Finished task 1', res));
t2.promise.then(res => console.log('Finished task 2', res));
t3.promise.then(res => console.log('Finished task 3', res));
t4.promise.then(res => console.log('Finished task 4', res));
let q = queue();
let p = waitProcessor();
let p1 = waitProcessor(1.5);
let w = workLoop(q, [p, p1]);
q.add(t1);
q.add(t2);
w.go();
q.add(t3);
q.add(t4);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment