-
-
Save dbaynard/0b5609ee25bec43b4fc2aeb4c37b6a4a to your computer and use it in GitHub Desktop.
const applyAsync = (f, pool) => (x, i) => | |
Promise.resolve(x) | |
.then((v) => new Promise((r) => pool.push({ i, v, r }))) | |
.then(f); | |
const worker = (getTimeout) => (it, i) => { | |
const { done, value } = it.next(); | |
if (done) return; | |
const { v, r } = value; | |
setTimeout( | |
() => | |
Promise.resolve(r(v)).then(() => { | |
setTimeout(() => { | |
worker(getTimeout)(it, i); | |
}, getTimeout()); | |
}), | |
0 | |
); | |
}; | |
const runWorkers = (n, queue, getTimeout) => | |
queueMicrotask(() => | |
new Array(n).fill(queue.splice(0).values()).forEach(worker(getTimeout)) | |
); | |
const mapConcurrentlyN = (n, f, xs) => { | |
const queue = []; | |
const ys = xs.map(applyAsync(f, queue)); | |
runWorkers(n, queue, () => 300); | |
return ys; | |
}; |
type Job<A, B> = { i: number; v: A; r: (_: B) => void }; | |
const applyAsync = | |
<A, B>(f: (_: A) => B, pool: Job<A, B>[]) => | |
(x: A, i: number): Promise<B> => | |
Promise.resolve(x) | |
.then((v) => new Promise((r) => pool.push({ i, v, r }))) | |
.then(f); | |
const worker = | |
(getTimeout: () => number) => | |
<A, B>(it: Iterable<Job<A, B>>, i: number) => { | |
const { done, value } = it.next(); | |
if (done) return; | |
const { v, r } = value; | |
setTimeout( | |
() => | |
Promise.resolve(r(v)).then(() => { | |
setTimeout(() => { | |
worker(getTimeout)(it, i); | |
}, getTimeout()); | |
}), | |
0 | |
); | |
}; | |
const runWorkers = <A, B>( | |
n: number, | |
queue: Job<A, B>[], | |
getTimeout: () => number | |
) => | |
queueMicrotask(() => | |
new Array(n).fill(queue.splice(0).values()).forEach(worker(getTimeout)) | |
); | |
const mapConcurrentlyN = <A, B>( | |
n: number, | |
f: (_: A) => B, | |
xs: A[] | |
): Promise<B>[] => { | |
const queue = []; | |
const ys = xs.map(applyAsync(f, queue)); | |
runWorkers(n, queue, () => 300); | |
return ys; | |
}; |
I should wrap the call establishing the worker threads in a new Promise
, pass the resolve
function to the worker, and then call resolve
when the last task completes (there’s some subtlety around completion). I can call resolve multiple times javascript - What happens if i reject / resolve multiple times in Kriskowal's q? - Stack Overflow
Yeah that didn’t work but I used queue.splice(0)
when generating the iterator, instead.
I might review the api/naming, at some point. I’ve kept the getTimeout
function as I want that in the worker
API. applyAsync
should probably be named queue
🤷
OK but just so I remember next time… I create the promises for each value in the original array in microtasks, and so I have to call queueMicrotask
in runWorkers
in order to run the queue at the end. Maybe I should just use setTimeout(…, 0)
, though 🤔
mapConcurrentlyN
is (possibly) very broken if the array passed in contains non-trivial promises 🤷
Oh, and I should pass performance.now()
to getTimeout
, or something similar, so the timeout function is actually useful.
Better than most of these