Skip to content

Instantly share code, notes, and snippets.

@trvswgnr
Created July 17, 2023 09:34
Show Gist options
  • Save trvswgnr/f264b577dee3a6858340337e0b246e70 to your computer and use it in GitHub Desktop.
Save trvswgnr/f264b577dee3a6858340337e0b246e70 to your computer and use it in GitHub Desktop.
generic concurrent execution
export async function runConcurrentAsync<T, A extends IterableIterator<unknown>>(tasks: Task<T>[], argsList: A[] = [], concurrency = 5): Promise<T[]> {
const semaphore = new Semaphore(concurrency);
const promises = tasks.map(async (task, index) => {
await semaphore.acquire();
try {
const args = argsList[index] || [];
return await task(...args);
} finally {
semaphore.release();
}
});
return Promise.all(promises);
};
export async function fetchAll(urls: string[], retryPolicy = defaultRetryPolicy): Promise<Response[]> {
const controller = new AbortController();
const tasks = urls.map(url => async () => {
try {
return await fetchWithRetry(url, retryPolicy, controller.signal);
} catch (error) {
controller.abort(); // abort all requests on fatal error
throw error;
}
});
return runConcurrentAsync(tasks);
};
async function fetchWithRetry(url: string, retryPolicy: RetryPolicy, signal: AbortSignal): Promise<Response> {
let attempt = 0;
const startTime = Date.now();
while (true) {
try {
const response = await fetch(url, { signal });
return response;
} catch (error) {
if (!(error instanceof Error)) throw new Error(`unexpected error when fetching ${url}`);
attempt++;
const elapsedTime = Date.now() - startTime;
if (!retryPolicy.shouldRetry({ attempt, elapsedTime, error })) throw error;
const delay = retryPolicy.calculateDelay({ attempt, elapsedTime, error });
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
const defaultRetryPolicy: RetryPolicy = {
shouldRetry({ elapsedTime, error }) {
return isNetworkError(error) && elapsedTime < 5000;
},
calculateDelay({ attempt }) {
const delay = Math.pow(2, attempt) * 1000; // exponential backoff
const jitter = Math.random() * 1000; // jitter
return delay + jitter;
}
}
function isNetworkError(error: unknown): boolean {
return error instanceof TypeError || error instanceof DOMException
|| (error instanceof Error && error.name !== "AbortError" && error.name !== "TypeError")
}
class Semaphore {
private tasks: CallableFunction[] = [];
private count: number;
constructor(count: number) {
this.count = count;
}
async acquire() {
if (this.count > 0) {
this.count--;
return Promise.resolve();
} else {
return new Promise(resolve => {
this.tasks.push(resolve);
});
}
}
release() {
if (this.tasks.length > 0) {
const next = this.tasks.shift();
if (typeof next === "function") {
next();
}
return;
}
this.count++;
}
}
type Task<T> = (...args: unknown[]) => Promise<T>;
type RetryPolicyParams = {
elapsedTime: number;
error: Error;
attempt: number;
}
interface RetryPolicy {
shouldRetry(args: RetryPolicyParams): boolean;
calculateDelay(args: RetryPolicyParams): number;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment