Skip to content

Instantly share code, notes, and snippets.

@joeytwiddle
Last active September 27, 2017 11:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joeytwiddle/3f3fde148f79a177e7c53d3ca2fb71c1 to your computer and use it in GitHub Desktop.
Save joeytwiddle/3f3fde148f79a177e7c53d3ca2fb71c1 to your computer and use it in GitHub Desktop.
Process a long list of promises in parallel, but with a concurrency limit
/**
* Repeatedly calls doNext() while conditionFn returns true or something truthy.
* Runs maxConcurrent doNext() promises in parallel, starting a new one each time a running one resolves.
*
* Example:
* var toProcess = new Array(2000).fill(true);
* function slowProcess (item) {
* return promiseUtils.delay(100);
* }
* whileConcurrently(
* () => toProcess.length > 0,
* () => slowProcess( toProcess.pop() ),
* 20
* );
*
* @param {function():boolean} conditionFn - Indicates whether there is more work to do or not. MUST BE SYNCHRONOUS!
* @param {function():Promise<*>} doNext - Starts the next piece of work, resolving when complete
* @param {Number} maxConcurrent - The number of promises that should be resolved in parallel
* @returns {Promise}
*
* See also: https://github.com/azproduction/promise-queue
*/
function whileConcurrently (conditionFn, doNext, maxConcurrent) {
maxConcurrent = maxConcurrent || 20;
var startWorker = function () {
const workRemains = conditionFn();
if (workRemains) {
// We must call doNext() synchronously
const prom = doNext();
// But it is not guaranteed to return a promise; it might return undefined or some other value.
return Promise.resolve().then(() => prom).then(startWorker);
} else {
return Promise.resolve();
}
};
const workers = [];
for (let i = 0; i < maxConcurrent; i++) {
workers.push(startWorker());
}
return Promise.all(workers);
}
/**
* Process a list of items in parallel, but with a limit.
*
* For example, to process a list of 100 objects (in this case numbers), with 20 processes running at any one time:
*
* const results = await eachConcurrently([1, ..., 100], 20, num => doWorkFor(num));
*
* @param {[<*|function():Promise>]} list - List of input data. If mapFn is not provided, list items should be functions that return promises
* @param {Number} maxConcurrent - The number of promises that should be resolved in parallel
* @param {function(*, Number, Number):Promise<*>} [mapFn] - Optional function that takes a list item and returns a promise. The function is also passed the item index and the length of the list, as second and third arguments, to match with Bluebird's Promise.each().
* @returns {Promise}
*/
function eachConcurrently (list, maxConcurrent, mapFn) {
mapFn = mapFn || (x => x == null || x());
const results = [];
let i = 0;
return whileConcurrently(
() => i < list.length,
() => {
const j = i;
i++;
// The mapFn might return a value or a promise
const jobResult = mapFn(list[j], j, list.length);
return Promise.resolve(jobResult).then(result => {
results[j] = result;
});
},
maxConcurrent
).then(
() => results
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment