Last active
February 5, 2021 17:20
-
-
Save friendlyanon/1d50297972ed15cf965ee486d6c4d56a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async function concurrentWorker(context, callback) { | |
const { queue } = context; | |
const abortToken = Symbol(); | |
while (context.ok) { | |
const { done, value } = queue.next(); | |
if (done) { | |
context.ok = false; | |
return; | |
} | |
const result = await callback(value, context.index++, abortToken); | |
if (result === abortToken) { | |
context.ok = false; | |
return; | |
} | |
} | |
} | |
const shouldRunInSeries = (concurrency) => !(concurrency > 1); | |
export function concurrentForEach(iterable, concurrency, callback) { | |
const queue = iterable[Symbol.iterator](); | |
const context = { queue, ok: true, index: 0 }; | |
if (shouldRunInSeries(concurrency)) { | |
return concurrentWorker(context, callback); | |
} | |
const workers = []; | |
for (let i = 0; i < concurrency && context.ok; ++i) { | |
workers.push(concurrentWorker(context, callback)); | |
} | |
return Promise.all(workers).then(() => {}, (error) => { | |
context.ok = false; | |
throw error; | |
}); | |
} | |
export async function concurrentUnorderedMap(iterable, concurrency, callback) { | |
const results = []; | |
await concurrentForEach(iterable, concurrency, async (value, idx) => { | |
results.push(await callback(value, idx)); | |
}); | |
return results; | |
} | |
async function mapBase(iterable, concurrency, callback) { | |
const results = []; | |
await concurrentForEach(iterable, concurrency, async (value, idx) => { | |
results.push([idx, await callback(value, idx)]); | |
}); | |
return results.sort((a, b) => a[0] - b[0]); | |
} | |
const mapUsing = (process) => (iterable, concurrency, callback) => | |
mapBase(iterable, concurrency, callback).then(process); | |
const unwrapOrderedPair = (pair) => pair[1]; | |
export const concurrentMap = mapUsing((pairs) => pairs.map(unwrapOrderedPair)); | |
export const concurrentFlatMap = | |
mapUsing((pairs) => pairs.flatMap(unwrapOrderedPair)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment