Skip to content

Instantly share code, notes, and snippets.

@dbaynard
Last active March 16, 2022 23:13
Show Gist options
  • Save dbaynard/0b5609ee25bec43b4fc2aeb4c37b6a4a to your computer and use it in GitHub Desktop.
Save dbaynard/0b5609ee25bec43b4fc2aeb4c37b6a4a to your computer and use it in GitHub Desktop.
JS Console bounded async
const applyAsync = (f, pool) => (x, i) =>
Promise.resolve(x)
.then((v) => new Promise((r) => pool.push({ i, v, r })))
.then(f);
const worker = (getTimeout) => (it, i) => {
const { done, value } = it.next();
if (done) return;
const { v, r } = value;
setTimeout(
() =>
Promise.resolve(r(v)).then(() => {
setTimeout(() => {
worker(getTimeout)(it, i);
}, getTimeout());
}),
0
);
};
const runWorkers = (n, queue, getTimeout) =>
queueMicrotask(() =>
new Array(n).fill(queue.splice(0).values()).forEach(worker(getTimeout))
);
const mapConcurrentlyN = (n, f, xs) => {
const queue = [];
const ys = xs.map(applyAsync(f, queue));
runWorkers(n, queue, () => 300);
return ys;
};
type Job<A, B> = { i: number; v: A; r: (_: B) => void };
const applyAsync =
<A, B>(f: (_: A) => B, pool: Job<A, B>[]) =>
(x: A, i: number): Promise<B> =>
Promise.resolve(x)
.then((v) => new Promise((r) => pool.push({ i, v, r })))
.then(f);
const worker =
(getTimeout: () => number) =>
<A, B>(it: Iterable<Job<A, B>>, i: number) => {
const { done, value } = it.next();
if (done) return;
const { v, r } = value;
setTimeout(
() =>
Promise.resolve(r(v)).then(() => {
setTimeout(() => {
worker(getTimeout)(it, i);
}, getTimeout());
}),
0
);
};
const runWorkers = <A, B>(
n: number,
queue: Job<A, B>[],
getTimeout: () => number
) =>
queueMicrotask(() =>
new Array(n).fill(queue.splice(0).values()).forEach(worker(getTimeout))
);
const mapConcurrentlyN = <A, B>(
n: number,
f: (_: A) => B,
xs: A[]
): Promise<B>[] => {
const queue = [];
const ys = xs.map(applyAsync(f, queue));
runWorkers(n, queue, () => 300);
return ys;
};
@dbaynard
Copy link
Author

Better than most of these

@dbaynard
Copy link
Author

dbaynard commented Mar 16, 2022

I should wrap the call establishing the worker threads in a new Promise, pass the resolve function to the worker, and then call resolve when the last task completes (there’s some subtlety around completion). I can call resolve multiple times javascript - What happens if i reject / resolve multiple times in Kriskowal's q? - Stack Overflow

Yeah that didn’t work but I used queue.splice(0) when generating the iterator, instead.

@dbaynard
Copy link
Author

dbaynard commented Mar 16, 2022

I might review the api/naming, at some point. I’ve kept the getTimeout function as I want that in the worker API. applyAsync should probably be named queue 🤷

@dbaynard
Copy link
Author

dbaynard commented Mar 16, 2022

OK but just so I remember next time… I create the promises for each value in the original array in microtasks, and so I have to call queueMicrotask in runWorkers in order to run the queue at the end. Maybe I should just use setTimeout(…, 0), though 🤔

mapConcurrentlyN is (possibly) very broken if the array passed in contains non-trivial promises 🤷

Oh, and I should pass performance.now() to getTimeout, or something similar, so the timeout function is actually useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment