Skip to content

Instantly share code, notes, and snippets.

@thomaskonrad
Created December 27, 2020 12:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thomaskonrad/9eea7c6c37b95ec2ac916daf148fbf88 to your computer and use it in GitHub Desktop.
Save thomaskonrad/9eea7c6c37b95ec2ac916daf148fbf88 to your computer and use it in GitHub Desktop.
Script to execute multiple Promises concurrently with a certain concurrency limit
import executeConcurrently from '@/generic/executeConcurrently';
test('Concurrent execution works with decreasing execution time', async () => {
const numberOfExecutions = 10;
const resolvedExecutions = await testExecution(
numberOfExecutions,
3,
(counter: number) => Math.ceil(100 / counter),
);
expect(resolvedExecutions).toEqual(numberOfExecutions);
});
test('Concurrent execution works with increasing execution time', async () => {
const numberOfExecutions = 10;
const resolvedExecutions = await testExecution(
numberOfExecutions,
3,
(counter: number) => 100 / (numberOfExecutions - counter + 1),
);
expect(resolvedExecutions).toEqual(numberOfExecutions);
});
test('Concurrent execution fails if one execution fails', async () => {
const numberOfExecutions = 10;
await expect(
testExecution(
numberOfExecutions,
3,
(counter: number) => 1,
(counter: number) => counter === 5,
),
).rejects.toEqual('Rejected');
});
async function testExecution(
numberOfPromises: number,
concurrencyLimit: number,
getTimeoutInMs: (counter: number) => number,
isToBeRejected: (counter: number) => boolean = () => false,
): Promise<void> {
let counter = 0;
const getNextExecution = (): Promise<any> => {
counter += 1;
if (counter > numberOfPromises) {
return null;
}
return new Promise((resolve, reject) => {
const currentCounter = counter;
setTimeout(() => {
if (true === isToBeRejected(currentCounter)) {
reject('Rejected');
} else {
resolve();
}
}, getTimeoutInMs(currentCounter));
});
};
return await executeConcurrently(getNextExecution, concurrencyLimit);
}
export default function executeConcurrently(
getNextPromise: () => Promise<any> | null,
concurrencyLimit: number,
): Promise<any> {
return new Promise((resolve, reject) => {
let numberOfActivePromises = 0;
let numberOfInitiatedPromises = 0;
let numberOfResolvedPromises = 0;
let lastPromiseReceived = false;
const executeNext = () => {
if (numberOfActivePromises >= concurrencyLimit) {
return;
}
const nextPromise = getNextPromise();
if (null === nextPromise) {
lastPromiseReceived = true;
return;
}
numberOfActivePromises += 1;
numberOfInitiatedPromises += 1;
nextPromise.then(() => {
numberOfActivePromises -= 1;
numberOfResolvedPromises += 1;
if (true === lastPromiseReceived && numberOfResolvedPromises === numberOfInitiatedPromises) {
resolve(numberOfResolvedPromises);
return;
}
if (true === lastPromiseReceived) {
return;
}
executeNext();
}).catch((error) => {
numberOfActivePromises -= 1;
reject(error);
});
executeNext();
};
executeNext();
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment