Skip to content

Instantly share code, notes, and snippets.

@lc0305
Last active January 27, 2023 13:55
Show Gist options
  • Save lc0305/93460f6d7b5b01bd1ede626bfab8b604 to your computer and use it in GitHub Desktop.
Save lc0305/93460f6d7b5b01bd1ede626bfab8b604 to your computer and use it in GitHub Desktop.
Idiomatic TypeScript adaptation of C#'s ParallelForEachAsync
/**
* MIT License
* Copyright (c) 2022 Lucas Crämer (GitHub: lc0305)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
/* --------------------------------------------------------------------------------------------
* concurrentMap(AllSettled) is an idiomatic TypeScript adaptation of C#'s ParallelForEachAsync
* --------------------------------------------------------------------------------------------
*/
export type AsyncMapFn<T, U> = (arrayItem: T) => Promise<U>;
type AsyncMapWrapperFn<T, U, R> = (
asyncMapFn: AsyncMapFn<T, U>,
...params: Parameters<typeof asyncMapFn>
) => Promise<R>;
async function concurrentMapInternal<T, U, R>(
array: ReadonlyArray<T>,
asyncMapFn: AsyncMapFn<T, U>,
MAX_CONCURRENT_PROMISES: number,
asyncMapWrapperFn: AsyncMapWrapperFn<T, U, R>
): Promise<Array<R>> {
if ((MAX_CONCURRENT_PROMISES |= 0) < 1)
return await Promise.all(
array.map((arrayItem) => asyncMapWrapperFn(asyncMapFn, arrayItem))
);
const ARRAY_LEN = array.length,
CONCURRENT_PROMISES = Math.min(ARRAY_LEN, MAX_CONCURRENT_PROMISES),
promises = new Array<Promise<void>>(CONCURRENT_PROMISES),
resultArray = new Array<R>(ARRAY_LEN);
let arrayIdx = 0;
for (let i = 0; i < CONCURRENT_PROMISES; ++i)
promises[i] = (async () => {
let localIdx: number;
while ((localIdx = arrayIdx++) < ARRAY_LEN)
resultArray[localIdx] = await asyncMapWrapperFn(
asyncMapFn,
array[localIdx]
);
})();
await Promise.all(promises);
return resultArray;
}
function asyncMapWrapper<T, U>(
asyncMapFn: AsyncMapFn<T, U>,
arrayItem: T
): Promise<U> {
return asyncMapFn(arrayItem);
}
/**
* Applies @param asyncMapFn to each item in @param array
* The degree of concurrency is limited by @param MAX_CONCURRENT_PROMISES
* IMPORTANT: Throws in case of an exception
*/
export function concurrentMap<T, U>(
array: ReadonlyArray<T>,
asyncMapFn: AsyncMapFn<T, U>,
MAX_CONCURRENT_PROMISES = 0
): Promise<Array<U>> {
return concurrentMapInternal<T, U, U>(
array,
asyncMapFn,
MAX_CONCURRENT_PROMISES,
asyncMapWrapper
);
}
const FULFILLED = "fulfilled" as const,
REJECTED = "rejected" as const;
async function asyncMapAllSettledWrapper<T, U>(
asyncMapFn: AsyncMapFn<T, U>,
arrayItem: T
): Promise<PromiseSettledResult<U>> {
try {
return { status: FULFILLED, value: await asyncMapFn(arrayItem) };
} catch (err) {
return { status: REJECTED, reason: err };
}
}
/**
* Applies @param asyncMapFn to each item in @param array
* The degree of concurrency is limited by @param MAX_CONCURRENT_PROMISES
* Same functionality as concurrentMap, but wraps each result in PromiseSettledResult,
* so that the function does not throw and only returns when all of the promises settle
*/
export function concurrentMapAllSettled<T, U>(
array: ReadonlyArray<T>,
asyncMapFn: AsyncMapFn<T, U>,
MAX_CONCURRENT_PROMISES = 0
): Promise<Array<PromiseSettledResult<U>>> {
return concurrentMapInternal<T, U, PromiseSettledResult<U>>(
array,
asyncMapFn,
MAX_CONCURRENT_PROMISES,
asyncMapAllSettledWrapper
);
}
const timeout = (ms: number): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, ms));
(async function test() {
const max = 1e4,
arr = new Array(max);
for (let i = 0; i < max; ++i) arr[i] = i;
const asyncFn = async (val: number): Promise<number> => {
await timeout(Math.random() * 2_000);
return ++val;
},
MAX_CONCURRENT_PROMISES = max / 10,
start = Date.now(),
[res, resAllSettled] = await Promise.all([
concurrentMap(arr, asyncFn, MAX_CONCURRENT_PROMISES),
concurrentMapAllSettled(arr, asyncFn, MAX_CONCURRENT_PROMISES),
]);
console.log(`Took ${Date.now() - start} ms.`);
for (let i = 0; i < max; ++i) {
const expected = arr[i] + 1,
got = res[i],
gotAllSettled = resAllSettled[i];
if (expected !== got)
throw new Error(
`Index at: ${i} does not match expected: ${expected} got: ${got}`
);
if (gotAllSettled.status !== FULFILLED)
throw new Error(`Index at: ${i} Promise was not fulfilled`);
if (expected !== gotAllSettled.value)
throw new Error(
`Index at: ${i} does not match expected: ${expected} got: ${gotAllSettled.value}`
);
}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment