Last active
May 20, 2024 02:51
-
-
Save conartist6/61217bd5e767c0ef843b0e267e29a0dc to your computer and use it in GitHub Desktop.
Concurrent buffering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Utils | |
const buildDeferred = () => { | |
const d = {}; | |
d.promise = new Promise((resolve, reject) => { | |
d.resolve = resolve; | |
d.reject = reject; | |
}); | |
return d; | |
} | |
const delay = (ms) => { | |
const deferred = buildDeferred(); | |
setTimeout(deferred.resolve, ms); | |
return deferred.promise; | |
} | |
function *filter(iterable, predicate) { | |
for (const value of iterable) { | |
if (predicate(value)) { | |
yield value; | |
} | |
} | |
} | |
const notNull = value => value !== null; | |
const getIterator = (iterable) => iterable[Symbol.asyncIterator]?.() || iterable[Symbol.iterator]?.(); | |
const wrapWithIdx = (stepPromise, idx) => { | |
return Promise.resolve(stepPromise).then((step) => { | |
return step.done | |
? { done: true, value: undefined, idx } | |
: Promise.resolve(step.value).then(value => ({ done: false, value, idx })); | |
}); | |
} | |
async function step(idx) { | |
await delay(Math.random() * 500); | |
return idx; | |
} | |
function* concurrentRange(n = Number.POSITIVE_INFINITY) { | |
let idx = 0; | |
while (idx < n) { | |
console.log('request'); | |
yield step(idx); | |
idx++; | |
} | |
} | |
async function* asyncRange(n = Number.POSITIVE_INFINITY) { | |
let idx = 0; | |
while (idx < n) { | |
console.log('request') | |
await delay(Math.random() * 500); | |
yield idx; | |
idx++; | |
} | |
} | |
// The thing I want to prove works: | |
// Buffering an async iterator without turning it into a sync iterator first | |
async function* bufferConcurrent(iterable, n) { | |
const asyncIter = getIterator(iterable); | |
const pool = new Array(n); | |
for (let idx = 0; idx < n; idx++) { | |
pool[idx] = wrapWithIdx(asyncIter.next(), idx); | |
} | |
let outstandingRequests = n; | |
while (outstandingRequests) { | |
const step = await Promise.race(filter(pool, notNull)); | |
if (!step.done) { | |
pool[step.idx] = wrapWithIdx(asyncIter.next(), step.idx); | |
yield step.value; | |
} else { | |
outstandingRequests--; | |
pool[step.idx] = null; | |
} | |
} | |
} | |
// Testing | |
Deno.test("Async generation", async () => { | |
for await (const element of bufferConcurrent(asyncRange(5), 4)) { | |
console.log(element); | |
} | |
}); | |
Deno.test("Concurrent generation", async () => { | |
for await (const element of bufferConcurrent(concurrentRange(5), 4)) { | |
console.log(element); | |
} | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Async generation ... | |
------- output ------- | |
request | |
request | |
0 | |
request | |
1 | |
request | |
2 | |
request | |
3 | |
4 | |
----- output end ----- | |
Async generation ... ok (1s) | |
Concurrent generation ... | |
------- output ------- | |
request | |
request | |
request | |
request | |
request | |
2 | |
1 | |
0 | |
4 | |
3 | |
----- output end ----- | |
Concurrent generation ... ok (497ms) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment