Skip to content

Instantly share code, notes, and snippets.

@jcouyang
Last active December 9, 2023 16:46
Show Gist options
  • Star 42 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save jcouyang/632709f30e12a7879a73e9e132c0d56b to your computer and use it in GitHub Desktop.
Save jcouyang/632709f30e12a7879a73e9e132c0d56b to your computer and use it in GitHub Desktop.
Promise All with Limit of Concurrent N

The Promise All Problem

in case of processing a very large array e.g. Promise.all(A_VERY_LARGE_ARRAY_OF_XHR_PROMISE)

which would probably blow you browser memory by trying to send all requests at the same time

solution is limit the concurrent of requests, and wrap promise in thunk

Promise.allConcurrent(2)([()=>fetch('BLAH1'), ()=>fetch('BLAH2'),...()=>fetch('BLAHN')])

if set concurrent to 2, it will send request BLAH1 and BLAH2 at the same time

if BLAH1 return response and resolved, will immediatly send request to BLAH3

in this way promise sending at the same time always keep the limit 2 which we’ve just configed before

describe('promiseAllStepN', function(){
describe('3 tasks, and cucurrent is 2', function(){
let tasks;
beforeEach(function(){
tasks = range(3).map(x=>sinon.stub())
})
describe('1 is finish', function(){
it('will kickoff the third task', function(done){
tasks[0].returns(Promise.resolve(0))
let task2 = tasks[2]
tasks[1].returns(new Promise(resolve=>setTimeout(()=>{
expect(task2.called).to.be.equal(true)
resolve(1)
done()
}, 1000)))
tasks[2].returns(Promise.resolve(2))
return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
})
})
})
describe('10 tasks, and cucurrent is 3', function(){
let tasks;
beforeEach(function(){
tasks = range(10).map(x=>sinon.stub())
})
describe('1st is finish but 2nd stuck', function(){
it.only('final task will run before 2nd', function(done){
tasks.forEach((task,index) => task.returns(Promise.resolve(index)))
let task10 = tasks[9]
tasks[1].returns(new Promise(resolve=>setTimeout(()=>{
expect(task10.called).to.be.equal(true)
resolve(1)
done()
}, 1000)))
return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
})
})
})
})
function promiseAllStepN(n, list) {
let tail = list.splice(n)
let head = list
let resolved = []
let processed = 0
return new Promise(resolve=>{
head.forEach(x=>{
let res = x()
resolved.push(res)
res.then(y=>{
runNext()
return y
})
})
function runNext(){
if(processed == tail.length){
resolve(Promise.all(resolved))
}else{
resolved.push(tail[processed]().then(x=>{
runNext()
return x
}))
processed++
}
}
})
}
Promise.allConcurrent = n => list => promiseAllStepN(n, list)
@rtuin
Copy link

rtuin commented Jun 27, 2019

This is brilliant!

@wertlex
Copy link

wertlex commented Dec 26, 2019

Not trying to reduce the success of your decision, but since this gist is highly ranked by Google I have to say that there is battle tested library with rich set of options: https://www.npmjs.com/package/bottleneck

@Chaoste
Copy link

Chaoste commented Mar 6, 2020

I agree with @wertlex. It' a very lightweight module (no dependencies) and ensures more reliability. Instead of:

    ...
    const tasks = params.map((param) => someDeferredFunction(param));
    return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
    ...

do:

const Bottleneck = require('bottleneck');
    ...
    const limiter = new Bottleneck({ maxConcurrent: 2 });
    const tasks = params.map((param) => limiter.schedule(() => someDeferredFunction(param)));
    return Promise.all(tasks).then(x=>console.log(x))
    ...

@mzdunek93
Copy link

mzdunek93 commented Apr 14, 2020

TypeScripted version:

const promiseAllLimit = async <T>(n: number, list: (() => Promise<T>)[]) => {
  const head = list.slice(0, n)
  const tail = list.slice(n)
  const result: T[] = []
  const execute = async (promise: () => Promise<T>, i: number, runNext: () => Promise<void>) => {
    result[i] = await promise()
    await runNext()
  }
  const runNext = async () => {
    const i = list.length - tail.length
    const promise = tail.shift()
    if (promise !== undefined) {
      await execute(promise, i, runNext)
    }
  }
  await Promise.all(head.map((promise, i) => execute(promise, i, runNext)))
  return result
}

@swayam18
Copy link

swayam18 commented Jun 11, 2020

More efficient (no splicing, only index based operations) Typescript version, with order preservation and early exit error handling semantics

export function promiseAllN<T>(collection: Array<() => Promise<T>>, n: number = 100): Promise<T[]> {
    let i =0;
    let jobsLeft = collection.length;
    let outcome = [];
    let rejected = false;
    // create a new promise and capture reference to resolve and reject to avoid nesting of code
    let resolve, reject;
    const pendingPromise: Promise<T[]> = new Promise(function(res, rej ) {
        resolve = res; reject = rej;
    });

    // Guard clause
    if(collection.length === 0) {
        resolve([]);
        return p;
    }
    
    // execute the j'th thunk
    function runJob(j: number) {
        collection[j]().then(result => {
            if(rejected) {
                return; // no op!
            }
            jobsLeft --;
            outcome[j] = result;
            if(jobsLeft <= 0) {
                resolve(outcome);
            } else if(i < collection.length) {
                runJob(i);
                i++;
            } else {
                return; // nothing to do here.
            }
        }).catch(e => {
            if(rejected) {
                return; // no op!
            }
            rejected = true;
            reject(e);
            return;
        });
    }
   
   // bootstrap, while handling cases where the length of the given array is smaller than maxConcurrent jobs
    while(i < Math.min(collection.length, n)) {
        runJob(i); i++;
    }

    return pendingPromise;
}

Usage:

promiseAllN(items.map((i) => () => /*...*/, 50)

Though I prefer a version that avoids creating a bunch of closures, as so:

promiseAllN(items, (i) => /*...*/, 50)

@marlomgirardi
Copy link

marlomgirardi commented Jun 29, 2020

We have a lot of good options and writing one or using a custom one seems to not be the best option.
I'm using this one @supercharge/promise-pool. It is dependency-free, small and has a readable api.

const PromisePool = require('@supercharge/promise-pool')
 
const users = [
  { name: 'Marcus' },
  { name: 'Norman' },
  { name: 'Christian' }
]

await PromisePool
  .for(users)
  .withConcurrency(2)
  .process(async data => {
    // Create user ...
  })

@webshared
Copy link

webshared commented Jan 12, 2021

The pure JS implementation without the callback hell :)

const pAll = async (queue, concurrency) => {
  let index = 0;
  const results = [];

  // Run a pseudo-thread
  const execThread = async () => {
    while (index < queue.length) {
      const curIndex = index++;
      // Use of `curIndex` is important because `index` may change after await is resolved
      results[curIndex] = await queue[curIndex]();
    }
  };

  // Start threads
  const threads = [];
  for (let thread = 0; thread < concurrency; thread++) {
    threads.push(execThread());
  }
  await Promise.all(threads);
  return results;
};

and a simple use case:

const test = async () => {
  const urls = ["url1", "url2", "url3"];
  const res = await pAll(
    urls.map(url => () => makeHTTPRequest(url)),
    5
  );
  console.log(res);
};

@mborgeaud
Copy link

mborgeaud commented Sep 3, 2021

@atolkachiov thanks for this brilliant JS implementation.
Let's say I'm storing the HTTP responses of each makeHTTPRequest in a global array. I'd like to retry those that fail (i.e. HTTP response <> 200). How should I chain another pAll call, so it runs right after the first completed? I tried this but didn't work.

var retryURLs= [];
function makeHTTPRequest(url) {
   ...
   if(httpResult != 200) retryURLs.push(url);
}

const test = async () => {
  const urls = ["url1", "url2", "url3"];
  await pAll(
    urls.map(url => () => makeHTTPRequest(url)),
    5
  );
  await pAll(
    retryURLs.map(url => () => makeHTTPRequest(url)),
    5
  );
};

@avuenja
Copy link

avuenja commented Nov 19, 2021

The pure JS implementation without the callback hell :)

const pAll = async (queue, concurrency) => {
  let index = 0;
  const results = [];

  // Run a pseudo-thread
  const execThread = async () => {
    while (index < queue.length) {
      const curIndex = index++;
      // Use of `curIndex` is important because `index` may change after await is resolved
      results[curIndex] = await queue[curIndex]();
    }
  };

  // Start threads
  const threads = [];
  for (let thread = 0; thread < concurrency; thread++) {
    threads.push(execThread());
  }
  await Promise.all(threads);
  return results;
};

and a simple use case:

const test = async () => {
  const urls = ["url1", "url2", "url3"];
  const res = await pAll(
    urls.map(url => () => makeHTTPRequest(url)),
    5
  );
  console.log(res);
};

This is awesome man! 🎉

@jacksonv1lle
Copy link

This is a great tool, thanks!

Not sure if it is intentional but I have noticed that if all the promises in a thread are rejected, the next thread is not executed. To prevent this I am catching the error and returning it.

results[curIndex] = await queue[curIndex]().catch(err => err);

@webshared
Copy link

@jacksonv1lle it happens because Promise.all rejects instantly upon any of promises reject. You can also wrap the call into try/catch:

try {
  results[curIndex] = await queue[curIndex]();
} catch(err) {
  console.log("ERROR", err);
  results[curIndex] = err;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment