Created
January 28, 2019 23:43
-
-
Save IceCreamYou/581915acec8fc7a31d670936e14395a8 to your computer and use it in GitHub Desktop.
A simple way to run promises with limited concurrency.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Creates and immediately starts running a new PromisePool. | |
* | |
* Note the pool currently has a fixed size, but dynamic sizing could be added. Also fns for size, isRunning. Does not track return values. | |
* | |
* @param producer | |
* A function that does some work concurrently. If the function returns a | |
* Promise, the producer will be called again when the Promise resolves and | |
* the new work will be added to the stack. If the function returns nothing, | |
* the pool will drain and stop. | |
* @param concurrency | |
* The number of producers to run at once. | |
*/ | |
function PromisePool(producer, concurrency) { | |
this.isRunning = true; | |
this.pool = []; | |
for (let i = 0; i < concurrency; i++) { | |
this._push(this._producerToItem(producer)); | |
} | |
} | |
/** | |
* Adds work to the pool. | |
* | |
* @param v A Promise to add to the pool. | |
*/ | |
PromisePool.prototype._push = function(v) { | |
if (v) this.pool.push(v); | |
} | |
/** | |
* Drains the pool. Returns a Promise that resolves when the pool is empty. | |
*/ | |
PromisePool.prototype.stop = async function() { | |
this.isRunning = false; | |
await Promise.all(this.pool); | |
console.log('Stopped with pool size: ' + this.pool.length); | |
} | |
/** | |
* Converts a function that does work into something the pool understands. | |
* | |
* @param producer A function that does work and returns a Promise. | |
*/ | |
PromisePool.prototype._producerToItem = function(producer) { | |
const pr = producer(); | |
if (!pr) { | |
this.stop(); | |
return; | |
} | |
const result = pr.then(() => { | |
Utils.removeFromArray(result, this.pool); | |
if (this.isRunning) this._push(this._producerToItem(producer, this.pool)); | |
console.log('Pool size: ' + this.pool.length); | |
}); | |
return result; | |
} | |
/** | |
* Creates a `PromisePool` from a list of promises. | |
* | |
* @param arr The list of promises. | |
* @param conc The concurrency. | |
*/ | |
PromisePool.fromArray = function(arr, conc) { | |
const pool = new PromisePool(() => { | |
if (arr.length) return arr.shift(); | |
}, conc); | |
return pool; | |
} | |
// ===== | |
// Tests | |
// ===== | |
// An example Promise-generating function; does nothing for `n` milliseconds | |
async function sleep(n) { | |
await new Promise((resolve) => setTimeout(resolve, n)); | |
console.log('slept for ' + n + 'ms'); | |
} | |
// An example Promise-generating function; sleeps for a random time then logs how many times it has been called | |
var i = 0; | |
async function doThings() { | |
await sleep(Math.floor(Math.random() * 500)); | |
console.log(++i); | |
} | |
// Runs doThings with a concurrency of 4 until `p.stop()` is called | |
// On average, this should run `doThings` 16 times per second: | |
// 4 concurrency * (1000ms / average of 250ms sleep per doThings() call | |
var p = new PromisePool(doThings, 4); | |
// Stop running `doThings` after 2 seconds, then test the array version | |
setTimeout(() => p.stop().then(() => PromisePool.fromArray([ | |
sleep(100), | |
sleep(200), | |
sleep(300), | |
sleep(400), | |
sleep(500), | |
sleep(600), | |
sleep(700), | |
], 3)), 2000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment