Skip to content

Instantly share code, notes, and snippets.

@friendlyanon
Last active February 5, 2021 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save friendlyanon/1d50297972ed15cf965ee486d6c4d56a to your computer and use it in GitHub Desktop.
Save friendlyanon/1d50297972ed15cf965ee486d6c4d56a to your computer and use it in GitHub Desktop.
async function concurrentWorker(context, callback) {
const { queue } = context;
const abortToken = Symbol();
while (context.ok) {
const { done, value } = queue.next();
if (done) {
context.ok = false;
return;
}
const result = await callback(value, context.index++, abortToken);
if (result === abortToken) {
context.ok = false;
return;
}
}
}
const shouldRunInSeries = (concurrency) => !(concurrency > 1);
export function concurrentForEach(iterable, concurrency, callback) {
const queue = iterable[Symbol.iterator]();
const context = { queue, ok: true, index: 0 };
if (shouldRunInSeries(concurrency)) {
return concurrentWorker(context, callback);
}
const workers = [];
for (let i = 0; i < concurrency && context.ok; ++i) {
workers.push(concurrentWorker(context, callback));
}
return Promise.all(workers).then(() => {}, (error) => {
context.ok = false;
throw error;
});
}
export async function concurrentUnorderedMap(iterable, concurrency, callback) {
const results = [];
await concurrentForEach(iterable, concurrency, async (value, idx) => {
results.push(await callback(value, idx));
});
return results;
}
async function mapBase(iterable, concurrency, callback) {
const results = [];
await concurrentForEach(iterable, concurrency, async (value, idx) => {
results.push([idx, await callback(value, idx)]);
});
return results.sort((a, b) => a[0] - b[0]);
}
const mapUsing = (process) => (iterable, concurrency, callback) =>
mapBase(iterable, concurrency, callback).then(process);
const unwrapOrderedPair = (pair) => pair[1];
export const concurrentMap = mapUsing((pairs) => pairs.map(unwrapOrderedPair));
export const concurrentFlatMap =
mapUsing((pairs) => pairs.flatMap(unwrapOrderedPair));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment