Skip to content

Instantly share code, notes, and snippets.

@IceCreamYou
Created January 28, 2019 23:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IceCreamYou/581915acec8fc7a31d670936e14395a8 to your computer and use it in GitHub Desktop.
Save IceCreamYou/581915acec8fc7a31d670936e14395a8 to your computer and use it in GitHub Desktop.
A simple way to run promises with limited concurrency.
/**
* Creates and immediately starts running a new PromisePool.
*
* Note the pool currently has a fixed size, but dynamic sizing could be added. Also fns for size, isRunning. Does not track return values.
*
* @param producer
* A function that does some work concurrently. If the function returns a
* Promise, the producer will be called again when the Promise resolves and
* the new work will be added to the stack. If the function returns nothing,
* the pool will drain and stop.
* @param concurrency
* The number of producers to run at once.
*/
function PromisePool(producer, concurrency) {
this.isRunning = true;
this.pool = [];
for (let i = 0; i < concurrency; i++) {
this._push(this._producerToItem(producer));
}
}
/**
* Adds work to the pool.
*
* @param v A Promise to add to the pool.
*/
PromisePool.prototype._push = function(v) {
if (v) this.pool.push(v);
}
/**
* Drains the pool. Returns a Promise that resolves when the pool is empty.
*/
PromisePool.prototype.stop = async function() {
this.isRunning = false;
await Promise.all(this.pool);
console.log('Stopped with pool size: ' + this.pool.length);
}
/**
* Converts a function that does work into something the pool understands.
*
* @param producer A function that does work and returns a Promise.
*/
PromisePool.prototype._producerToItem = function(producer) {
const pr = producer();
if (!pr) {
this.stop();
return;
}
const result = pr.then(() => {
Utils.removeFromArray(result, this.pool);
if (this.isRunning) this._push(this._producerToItem(producer, this.pool));
console.log('Pool size: ' + this.pool.length);
});
return result;
}
/**
* Creates a `PromisePool` from a list of promises.
*
* @param arr The list of promises.
* @param conc The concurrency.
*/
PromisePool.fromArray = function(arr, conc) {
const pool = new PromisePool(() => {
if (arr.length) return arr.shift();
}, conc);
return pool;
}
// =====
// Tests
// =====
// An example Promise-generating function; does nothing for `n` milliseconds
async function sleep(n) {
await new Promise((resolve) => setTimeout(resolve, n));
console.log('slept for ' + n + 'ms');
}
// An example Promise-generating function; sleeps for a random time then logs how many times it has been called
var i = 0;
async function doThings() {
await sleep(Math.floor(Math.random() * 500));
console.log(++i);
}
// Runs doThings with a concurrency of 4 until `p.stop()` is called
// On average, this should run `doThings` 16 times per second:
// 4 concurrency * (1000ms / average of 250ms sleep per doThings() call
var p = new PromisePool(doThings, 4);
// Stop running `doThings` after 2 seconds, then test the array version
setTimeout(() => p.stop().then(() => PromisePool.fromArray([
sleep(100),
sleep(200),
sleep(300),
sleep(400),
sleep(500),
sleep(600),
sleep(700),
], 3)), 2000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment