Skip to content

Instantly share code, notes, and snippets.

@alexpsi
Created February 25, 2017 20:31
Show Gist options
  • Star 18 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alexpsi/43dd7fd4d6a263c7485326b843677740 to your computer and use it in GitHub Desktop.
Save alexpsi/43dd7fd4d6a263c7485326b843677740 to your computer and use it in GitHub Desktop.
Like Promise.all but executed in chunks, so you can have some async concurrency control.
const chunks = (arr, chunkSize) => {
let results = [];
while (arr.length) results.push(arr.splice(0, chunkSize));
return results;
};
module.exports = (xs, f, concurrency) => {
if (xs.length == 0) return Promise.resolve();
return Promise.all(chunks(xs, concurrency).reduce(
(acc, chunk) => acc.then(() => Promise.all(chunk.map(f))),
Promise.resolve()
));
};
@caub
Copy link

caub commented Dec 14, 2018

what you wrote:

const all = async (xs, f, n=Infinity) => {
	const chunks = Array.from({length: Math.ceil(xs.length/n)||1}, (_,i)=>xs.slice(i*n, (i+1)*n));
	for (const chunk of chunks) await Promise.all(chunk.map(f));
}
var delay=t=>new Promise(r=>setTimeout(r,t))
console.time(1); await all([500,3000,800,2000,1500], delay, 2); console.timeEnd(1)
// 1: 6501.330078125ms

@caub
Copy link

caub commented Mar 22, 2019

a smoother version:

var concurrent=async (xs, f, n=Infinity) => {
	const finished = Symbol();
	let promises = xs.slice(0, n).map(f), others = xs.slice(n);
	while (promises.length) {
		await Promise.race(promises.map(promise => promise.then(() => {promise[finished] = true;})));
		promises = promises.filter(promise => !promise[finished]);
		promises.push(...others.splice(0, n - promises.length).map(f));
    }
};
var delay = t => new Promise(r=>setTimeout(r, t))
console.time(1); await concurrent([500,3000,800,2000,1500], delay, 2); console.timeEnd(1)
// 1: 4501.019775390625ms

@walidbagh
Copy link

👍

@churchofthought
Copy link

churchofthought commented Dec 30, 2020

@caub Nice code. I didn't know about Symbol() - very reminiscent of Ruby. Great to learn new things about ECMAScript. Such a banger of a language! Thank you for writing such nice code! I can see how you are pulling new promises in as soon as a single one finishes using Promise.Race. Unfortunately this code does not return the value of the promises. Additionally, .filter is quite an expensive operation to be using every time. It seems to me it should be possible to do away with a bit of the functional cruft and get this to be a .map and to be much faster, while still retaining the elegance. Gonna work on it for a bit, because all the libs out there are way overblown for what amounts to just wanting to limit concurrency :-)

@churchofthought
Copy link

churchofthought commented Dec 30, 2020

Here is my best effort. Let me know what you guys think!
@caub @walidbagh

Array.prototype.parallelMap = async function (f, n = Infinity) {
	const results = Array(this.length)
	const keysLeft = this.entries()

	const next = async () => {
		const entry = keysLeft.next()
		if (entry.done) return
		const [key, val] = entry.value
		results[key] = await f(val, key)
		await next()
	}

	await Promise.all([...Array(Math.min(this.length, n))].map(next))

	return results
}




var delay = t => new Promise(r => setTimeout(r, t, t))

for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await [100, 200, 400, 800, 1600].parallelMap(delay, n)); console.timeEnd(tid)
}

VM4272:26 (5) [100, 200, 400, 800, 1600]
VM4272:26 1 workers: 3114.070068359375 ms
VM4272:26 (5) [100, 200, 400, 800, 1600]
VM4272:26 2 workers: 2107.2900390625 ms
VM4272:26 (5) [100, 200, 400, 800, 1600]
VM4272:26 3 workers: 1803.89990234375 ms
VM4272:26 (5) [100, 200, 400, 800, 1600]
VM4272:26 4 workers: 1703.367919921875 ms
VM4272:26 (5) [100, 200, 400, 800, 1600]
VM4272:26 5 workers: 1602.065673828125 ms

@churchofthought
Copy link

I've posted my own gist to keep this code updated.
Will be publishing an NPM module eventually.

https://gist.github.com/churchofthought/b1a937929de44afb322ab850b4a3f169

@caub
Copy link

caub commented Dec 30, 2020

@churchofthought it's better not adding things on Array.prototype

I tried to improve my snippet, here's another version:

const delay = t => new Promise(r=>setTimeout(r, t));

async function concurrent(fns, n=Infinity) {
	let promises = fns.slice(0, n).map(f => f());
	let stack = fns.slice(n); // remaining ones to process
	while (promises.length > 0) {
		await Promise.race(
			promises.map(p => p.then(() => {
				const f = stack.shift();
				promises = [...promises.filter(pi => pi!==p), ...f?[f()]:[]];
			}))
		);
	}
}

for (let i=1; i<=5; i++) {
	console.time(i); await concurrent([500,3000,800,2000,1500].map(t=>()=>delay(t)), i); console.timeEnd(i);
}
//VM4726:2 1: 7801.465087890625 ms
//VM4726:2 2: 4500.680908203125 ms
//VM4726:2 3: 3000.656005859375 ms
//VM4726:2 4: 3000.639892578125 ms
//VM4726:2 5: 3000.586181640625 ms

@churchofthought
Copy link

Fair enough, here is a non-prototype version @caub:

const parallelMap = async (arr, f, n = Infinity) => {
	const results = Array(arr.length)
	const keysLeft = arr.entries()

	const next = async () => {
		const entry = keysLeft.next()
		if (entry.done) return
		const [key, val] = entry.value
		results[key] = await f(val, key)
		await next()
	}

	await Promise.all([...Array(Math.min(arr.length, n))].map(next))

	return results
}

const delay = t => new Promise(r => setTimeout(r, t, t))

for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelMap(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

I don't mean to disparage you but your code is allocating memory all over the place with its use of map, slice, filter, and shift. Promise.race isn't necessary either.

@caub
Copy link

caub commented Dec 31, 2020

I don't mean to disparage you but your code is allocating memory all over the place with its use of map, slice, filter, and shift. Promise.race isn't necessary either.

Well I don't think allocating memory for those things matter in JS, using .slice, etc.. is common and even encouraged. Your recursive approach is not very clear to be honest. It's better to optimize for readability than for performance (as we often get it wrong anyway, since v8 optimizes things for you)

Last, I used function wrappers returning promises as first argument of my concurrent() function, this is common pattern, used in https://caolan.github.io/async for example

Promise.race is relevant, it literally does what it's supposed to:

  • Take n first promises
  • As soon as one is done (Promise.race), replace with the next promise to run in the stack
  • Iterate until all promises are ran

@caub
Copy link

caub commented Jan 1, 2021

At second glance, I understand your approach, makes total sense, and is more practical to implement. Thanks for sharing, it's good to have both an iterative and recursive solution

@churchofthought
Copy link

At second glance, I understand your approach, makes total sense, and is more practical to implement. Thanks for sharing, it's good to have both an iterative and recursive solution

You're welcome. Thank you for understanding. It can be converted to a promise based solution without recursion, if async is removed and Promise.then is instead used. But the code will be significantly more complex.

I was actually worried about the recursion/ stack overflow, because apparently many JS engines are adding async stack traces to their engines..which would mean async recursion isn't equivalent to promise chaining anymore.

This code still seems to perform fine on 20,000+ elements with 10 workers, so I am not sure what the deal is. Perhaps the async stack trace flag needs to be set on Node to see the damage recursion brings.

Nonetheless, you are right about the recursion and it'd be best to fix it up. If you want to give it a shot that'd be awesome, otherwise I will try sometime this week.

Plan to release an NPM package with parallelMap and parallelDo because all the existing packages are overblown and confusing.

@walidbagh
Copy link

@caub @churchofthought All i see here is improvement, as both methods are valid, and the performance difference is pretty small.
Thanks both of you for improving this gist !

@churchofthought
Copy link

churchofthought commented Jan 1, 2021

@walidbagh You're welcome. Thanks for making the gist!

Last, I used function wrappers returning promises as first argument of my concurrent() function, this is common pattern, used in https://caolan.github.io/async for example

Thanks for letting me know about this. It is a good idea to follow the lead of Promise.all. Promise.all([5,4,3]) does handle non promises just fine so it would make sense to do so as well.

@caub
Copy link

caub commented Jan 1, 2021

@churchofthought

Oh yea 20k+ items, now I see why perf matters

I've a bit adapted the non-recursive version below, I don't see other ways to do it, like it seems Promise.race is necessary there, while Promise.all is needed for the recursive version

async function concurrent(fns, n=Infinity) {
	const results = fns.map(() => undefined);

	// try make iteration efficient by holding a Set of the indexes of fns promises to run, intially it's [0, 1, .., n-1]
	const indexes = new Set(fns.slice(0, n).map((_, i) => i));

	// iterate fns.length - n times moving the indexes, then n-1 more times for the remaining items until indexes is empty (we could run a Promise.all with the final chunk too)
	let k = n;
	do {
		const [i, result] = await Promise.race([...indexes].map(async i => {
			results[i] = results[i] ?? fns[i](); // cache promise there, so it continues running across iterations when it's a long one
			const result = await results[i];
			return [i, result];
		}));
		results[i] = result;
		indexes.delete(i);
		if (k < fns.length) indexes.add(k++);
	} while (indexes.size > 0);
	
	return results;
}
delay = t => new Promise(r => setTimeout(r, t, t));
for (let i=1; i<=6; i++) { console.time(i); console.log(await concurrent([500,3000,800,2000,1500].map(t=>()=>delay(t)), i)); console.timeEnd(i); }

@churchofthought
Copy link

churchofthought commented Jan 1, 2021

@caub I thought this was the case but I just confirmed that await x where x is not a promise, will just return x, which is very helpful with regard to the wrapper issue. Promise.all, Promise.race, and all those methods seem to handle non-promises with grace too.

I was able to remove the recursion which I think fixes all the issues raised. I've also added an inPlace parameter, incase we do not want ANY memory allocation at all, and want to just use the existing array.

One more thing, I created a stripped down version called parallelDo which does not store results, it's simply a parallel version of .forEach.

Here's what I got.

const parallelMap = async (arr, f, n = Infinity, inPlace = false) => {
	const results = inPlace ? arr : Array(arr.length) 
	const entriesLeft = arr.entries()

	const worker = async () => {
		for (;;){
			const entry = entriesLeft.next()
			if (entry.done) return

			const [key, val] = entry.value
			results[key] = await f(val, key)
		}
	}

	await Promise.all([...Array(Math.min(arr.length, n))].map(worker))

	return results
}

const parallelDo = async (arr, f, n = Infinity) => {
	const entriesLeft = arr.entries()

	const worker = async () => {
		for (;;){
			const entry = entriesLeft.next()
			if (entry.done) return

			const [key, val] = entry.value
			await f(val, key)
		}
	}

	await Promise.all([...Array(Math.min(arr.length, n))].map(worker))
}

const delay = t => new Promise(r => setTimeout(r, t, t))

// test parallelMap
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelMap(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

// test parallelDo
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelDo(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

I don't think there's any way to make this code any simpler. Would you like to help me publish an NPM package and maintain it?

@churchofthought
Copy link

churchofthought commented Jan 1, 2021

@caub Any idea what kind of tests would be ideal for something like this? Timeouts are a good start but there must be some other kind of client-side async we can maybe use. What immediately comes to mind is having some static files and using util.promisify(fs.readFile) and making sure our maps are returning the correct transformations of a file with 100,000 digits or something.

It would be nice if we could write tests that wouldn't necessarily depend on Node, so that browser-JS package managers like Bower wouldn't get all pissy.

@caub
Copy link

caub commented Jan 2, 2021

  • I think you can forget about bower, it's so old, now we use webpack, rollup, ..
  • util.promisify(fs.readFile) -> just use fs.promises.readFile
  • I think using this delay function is good enough for unit-tests, just use small timeouts
  • I think you don't need 2 functions, just keep parallelMap, the other is the same without return value
  • you could replace [...Array(Math.min(arr.length, n))].map(worker) by Array.from({length: Math.min(arr.length, n)}, worker)
  • you could replace your for loop with for (const [key, val] of entriesLeft) { results[key] = await f(val, key); }
async function parallelMap(arr, f, n = Infinity) {
	const results = arr.map(() => undefined); 
	let i = 0;
	const worker = async () => {
		for (; i<arr.length; i++) {
			results[i] = await f(arr[i], i);
		}
	}

	await Promise.all(Array.from({length: Math.min(arr.length, n)}, worker));

	return results;
}
await parallelMap([100, 200, 400, 800, 1600], delay, 2)

version without async/await if you want:

function parallelMap(arr, f, n = Infinity) {
	const results = arr.map(() => undefined);
	let i = 0;
	const worker = () => {
		return arr.reduce((p) => p.then(() => f(arr[i], i)).then(r => {results[i++] = r;}), Promise.resolve());
	}
	return Promise.all(Array.from({length: Math.min(arr.length, n)}, worker))
		.then(() => results);
}
const start = Date.now():
const r = await parallelMap([8, 9, 10, 4, 5], delay, 3);
const t = Date.now() - start;
assert.deepStrictEqual(r, [...])
assert.ok(14 < t && t < 14 + delta); // delta ~ 3ms tick time

@churchofthought
Copy link

churchofthought commented Jan 2, 2021

Wow, very nice improvements. I am learning a lot from you. Thank you @caub .

Question: since for..of usually calls Symbol.iterator to grab our iterator, can we be sure in this case that it will just use the iterator as-is, shared between all our workers?

Edit: Just did some testing and it appears that handing existing Iterators to for...of works just fine. So the for.. loop will work for the shared iterator. Beautiful. Thanks again caub, I love elegance!! And you definitely have a lot of experience with it 👍

@churchofthought
Copy link

churchofthought commented Jan 3, 2021

Here is the final code with the improvements you mentioned:

const parallelMap = async (arr, f, n = Infinity, inPlace = false) => {
	const results = inPlace ? arr : Array(arr.length) 
	const entries = arr.entries()

	const worker = async () => {
		for (const [key, val] of entries)
			results[key] = await f(val, key)
	}

	await Promise.all(
		Array.from({length: Math.min(arr.length, n)}, worker)
	)

	return results
}

const parallelDo = async (arr, f, n = Infinity) => {
	const entries = arr.entries()

	const worker = async () => {
		for (const [key, val] of entries)
			await f(val, key)
	}

	await Promise.all(
		Array.from({length: Math.min(arr.length, n)}, worker)
	)
}

const delay = t => new Promise(r => setTimeout(r, t, t))

// test parallelMap
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelMap(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

// test parallelDo
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelDo(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

@tomardern
Copy link

Here is my attempt - not sure on performance:

function chunkPromises(array: Promise<any>[], chunkSize: number, delay: number): Promise<any> {
  let p: Promise<any[]> = Promise.resolve([]);
  for (var i = 0; i < array.length; i += chunkSize) {
    const chunk = array.slice(i, i + chunkSize);
    p = i == 0 ? p : p.then(results => new Promise(resolve => setTimeout(resolve, delay, results)));
    p = p.then(results => Promise.all(chunk).then(chunkResults => results.concat(chunkResults)));
  }
  return p;
}

@caub
Copy link

caub commented Jun 4, 2022

@tomardern if the first argument is an array of Promise, it means they are all already running or resolved/rejected, so it won't run concurrently

Try to make a test like I did to verify this, and you need to pass an array of functions returning a promise

@churchofthought
Copy link

churchofthought commented Jun 4, 2022

@tomardern You're mutating/copying (slice, concat) and using recursion. Performance will be abysmal.

In addition, chunking is suboptimal since the longest running task can lower concurrency down to 1 -- in fact, chunking will always cause spikey concurrency. Most use cases desire a fixed concurrency with new tasks immediately replacing finished ones.

The code for parallelMap that I refined with @caub 's help is as optimal as I can envision. It is smoothly concurrent and involves no allocations beyond what's necessary. If you find a way to improve it, that'd be cool. 🤡

@geomus
Copy link

geomus commented Aug 24, 2022

Hi guys! I know is an old thread but just run into it, amazing work! learning a lot about perf conditions you mention. My approach (saving the differences) were to stablish SNS publish and making use of bluebird library (maybe avoidable) with ramda.

const Promise = require('bluebird');
const PASSENGERS_SPLIT = 10;

async function notificationsSNSPublisher(snsPublish, payload) {
  const { passengers } = payload;
  const snsPayload = dissoc('passengers', payload);

  return await Promise.map(splitEvery(PASSENGERS_SPLIT, passengers), async passengersChunk => {
    const messagePayload = {
      ...snsPayload,
      passengers: passengersChunk
    };
    const { MessageId } = await snsPublish(messagePayload, { messageAttributes: {} });

    return MessageId;
  });
}

Is possible to add a delay() function of course. I’m going to gather parts of the code you posted and improve this

@nk1tz
Copy link

nk1tz commented Sep 17, 2022

Here is my typescript version:

const throttledConcurrencyPromiseAll = async <T>(
  arr: Array<any>,
  f: (v: any) => Promise<T>,
  n = Infinity,
): Promise<T[]> => {
  const results = Array(arr.length);
  const entries = arr.entries();
  const worker = async () => {
    for (const [key, val] of entries) results[key] = await f(val);
  };
  await Promise.all(Array.from({ length: Math.min(arr.length, n) }, worker));
  return results;
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment