Skip to content

Instantly share code, notes, and snippets.

@andybee
Last active February 17, 2022 18:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andybee/91cf907547cf8eaa96447fced4fab94f to your computer and use it in GitHub Desktop.
Save andybee/91cf907547cf8eaa96447fced4fab94f to your computer and use it in GitHub Desktop.
Simple Pool for executing a maximum number of concurrent Javascript task functions requested ad-hoc with support for Promises
import Pool from './pool.js';
const pool = new Pool(3);
const wasteTime = (input) => new Promise((resolve) => {
setTimeout(() => resolve(input), 1000);
});
/**
* Expected output is to see 1 2 3 after a delay of 1 second, then 4 5 6 after
* another second, then 7 8 9 before 'All complete'. This shows us that only 3
* tasks are executed at a time and that each call to `performTask` returns the
* output from the task function.
*/
await Promise.all([
(async () => { console.log(await pool.performTask(() => wasteTime(1))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(2))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(3))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(4))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(5))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(6))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(7))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(8))); })(),
(async () => { console.log(await pool.performTask(() => wasteTime(9))); })(),
]);
console.log('All complete');
/**
* Limits the number of concurrent pending Promises. When the limit is reached,
* further requests are queued until the number of concurrent requests drops.
*/
export default class Pool {
constructor(maxConcurrentPendingPromises) {
this.queue = [];
this.maxConcurrentPendingPromises = maxConcurrentPendingPromises;
this.numConcurrentPendingPromises = 0;
}
/**
* Advances the pool to the next task (if any).
*/
async _next() {
if (this.numConcurrentPendingPromises >= this.maxConcurrentPendingPromises
|| this.queue.length === 0) {
return;
}
this.numConcurrentPendingPromises += 1;
try {
await this.queue.shift()();
} catch (err) {
// noop
}
this.numConcurrentPendingPromises -= 1;
this._next();
}
/**
* Executes the supplied function either immediately, or when the number of
* concurrent promises drops below the maximum for this pool. Returns a
* promise which resolves or rejects with the result of the function once
* complete.
*/
performTask(fn, ...args) {
return new Promise((resolve, reject) => {
this.queue.push(() => {
const promise = fn(...args);
promise.then(resolve, reject);
return promise;
});
this._next();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment