Skip to content

Instantly share code, notes, and snippets.

@conartist6
Last active May 20, 2024 02:51
Show Gist options
  • Save conartist6/61217bd5e767c0ef843b0e267e29a0dc to your computer and use it in GitHub Desktop.
Save conartist6/61217bd5e767c0ef843b0e267e29a0dc to your computer and use it in GitHub Desktop.
Concurrent buffering
// Utils
const buildDeferred = () => {
const d = {};
d.promise = new Promise((resolve, reject) => {
d.resolve = resolve;
d.reject = reject;
});
return d;
}
const delay = (ms) => {
const deferred = buildDeferred();
setTimeout(deferred.resolve, ms);
return deferred.promise;
}
function *filter(iterable, predicate) {
for (const value of iterable) {
if (predicate(value)) {
yield value;
}
}
}
const notNull = value => value !== null;
const getIterator = (iterable) => iterable[Symbol.asyncIterator]?.() || iterable[Symbol.iterator]?.();
const wrapWithIdx = (stepPromise, idx) => {
return Promise.resolve(stepPromise).then((step) => {
return step.done
? { done: true, value: undefined, idx }
: Promise.resolve(step.value).then(value => ({ done: false, value, idx }));
});
}
async function step(idx) {
await delay(Math.random() * 500);
return idx;
}
function* concurrentRange(n = Number.POSITIVE_INFINITY) {
let idx = 0;
while (idx < n) {
console.log('request');
yield step(idx);
idx++;
}
}
async function* asyncRange(n = Number.POSITIVE_INFINITY) {
let idx = 0;
while (idx < n) {
console.log('request')
await delay(Math.random() * 500);
yield idx;
idx++;
}
}
// The thing I want to prove works:
// Buffering an async iterator without turning it into a sync iterator first
async function* bufferConcurrent(iterable, n) {
const asyncIter = getIterator(iterable);
const pool = new Array(n);
for (let idx = 0; idx < n; idx++) {
pool[idx] = wrapWithIdx(asyncIter.next(), idx);
}
let outstandingRequests = n;
while (outstandingRequests) {
const step = await Promise.race(filter(pool, notNull));
if (!step.done) {
pool[step.idx] = wrapWithIdx(asyncIter.next(), step.idx);
yield step.value;
} else {
outstandingRequests--;
pool[step.idx] = null;
}
}
}
// Testing
Deno.test("Async generation", async () => {
for await (const element of bufferConcurrent(asyncRange(5), 4)) {
console.log(element);
}
});
Deno.test("Concurrent generation", async () => {
for await (const element of bufferConcurrent(concurrentRange(5), 4)) {
console.log(element);
}
});
Async generation ...
------- output -------
request
request
0
request
1
request
2
request
3
4
----- output end -----
Async generation ... ok (1s)
Concurrent generation ...
------- output -------
request
request
request
request
request
2
1
0
4
3
----- output end -----
Concurrent generation ... ok (497ms)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment