Skip to content

Instantly share code, notes, and snippets.

@jerrylususu
Created February 17, 2024 12:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jerrylususu/57ce05d868aa3fd8a0c1d5084b0d0076 to your computer and use it in GitHub Desktop.
Save jerrylususu/57ce05d868aa3fd8a0c1d5084b0d0076 to your computer and use it in GitHub Desktop.
promise concurrency
type PromiseFunction<T> = () => Promise<T>;
type PromiseResult<T> = { status: 'fulfilled', value: T } | { status: 'rejected', reason: any };
async function allSettledWithConcurrency<T>(promises: PromiseFunction<T>[], concurrency: number): Promise<PromiseResult<T>[]> {
const results: PromiseResult<T>[] = new Array(promises.length);
let running = 0;
let index = 0;
return new Promise((resolve) => {
const enqueue = () => {
while (running < concurrency && index < promises.length) {
const currentIndex = index;
running++;
index++;
promises[currentIndex]().then(
value => results[currentIndex] = { status: 'fulfilled', value },
reason => results[currentIndex] = { status: 'rejected', reason }
).finally(() => {
running--;
if (running === 0 && index === promises.length) {
resolve(results);
} else {
enqueue();
}
});
}
};
enqueue();
});
}
// Example usage
async function demo() {
// Create an array of promise-returning functions for demonstration
const promiseFunctions = [
() => Promise.resolve('Result 1'),
() => new Promise(resolve => setTimeout(() => resolve('Result 2'), 1000)), // Delayed resolve
() => Promise.reject(new Error('Error in 3')),
() => Promise.resolve('Result 4')
];
const results = await allSettledWithConcurrency(promiseFunctions, 3);
console.log(results);
}
demo();
type PromiseFunction<T = any> = () => Promise<T>;
async function allSettledWithConcurrency<T>(
promiseFns: Array<PromiseFunction<T>>,
concurrencyLimit: number
): Promise<PromiseSettledResult<T>[]> {
let index = 0; // To keep track of the current index of promise functions being processed
const results: PromiseSettledResult<T>[] = new Array(promiseFns.length);
// Function to process each promise function
async function processPromise(fn: PromiseFunction<T>, idx: number) {
try {
const value = await fn();
results[idx] = { status: "fulfilled", value };
} catch (reason) {
results[idx] = { status: "rejected", reason };
}
}
// Function to run promises with concurrency control
async function run() {
const running:Promise<void>[] = [];
while (index < promiseFns.length) {
if (running.length < concurrencyLimit) {
let currentIndex = index++;
const promise = processPromise(promiseFns[currentIndex], currentIndex);
running.push(promise);
// When a promise is settled, remove it from the running array
promise.finally(() => {
running.splice(running.indexOf(promise), 1);
});
} else {
// Wait for one of the promises to settle before adding more
await Promise.race(running);
}
}
// Wait for all remaining promises to settle
await Promise.all(running);
}
await run();
return results;
}
// Example usage
async function demo() {
// Create an array of promise-returning functions for demonstration
const promiseFunctions = [
() => Promise.resolve('Result 1'),
() => new Promise(resolve => setTimeout(() => resolve('Result 2'), 1000)), // Delayed resolve
() => Promise.reject(new Error('Error in 3')),
() => Promise.resolve('Result 4')
];
const results = await allSettledWithConcurrency(promiseFunctions, 3);
console.log(results);
}
demo();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment