Skip to content

Instantly share code, notes, and snippets.

@alexpsi
Created February 25, 2017 20:31
Show Gist options
  • Save alexpsi/43dd7fd4d6a263c7485326b843677740 to your computer and use it in GitHub Desktop.
Save alexpsi/43dd7fd4d6a263c7485326b843677740 to your computer and use it in GitHub Desktop.
Like Promise.all but executed in chunks, so you can have some async concurrency control.
const chunks = (arr, chunkSize) => {
let results = [];
while (arr.length) results.push(arr.splice(0, chunkSize));
return results;
};
module.exports = (xs, f, concurrency) => {
if (xs.length == 0) return Promise.resolve();
return Promise.all(chunks(xs, concurrency).reduce(
(acc, chunk) => acc.then(() => Promise.all(chunk.map(f))),
Promise.resolve()
));
};
@walidbagh
Copy link

@caub @churchofthought All i see here is improvement, as both methods are valid, and the performance difference is pretty small.
Thanks both of you for improving this gist !

@churchofthought
Copy link

churchofthought commented Jan 1, 2021

@walidbagh You're welcome. Thanks for making the gist!

Last, I used function wrappers returning promises as first argument of my concurrent() function, this is common pattern, used in https://caolan.github.io/async for example

Thanks for letting me know about this. It is a good idea to follow the lead of Promise.all. Promise.all([5,4,3]) does handle non promises just fine so it would make sense to do so as well.

@caub
Copy link

caub commented Jan 1, 2021

@churchofthought

Oh yea 20k+ items, now I see why perf matters

I've a bit adapted the non-recursive version below, I don't see other ways to do it, like it seems Promise.race is necessary there, while Promise.all is needed for the recursive version

async function concurrent(fns, n=Infinity) {
	const results = fns.map(() => undefined);

	// try make iteration efficient by holding a Set of the indexes of fns promises to run, intially it's [0, 1, .., n-1]
	const indexes = new Set(fns.slice(0, n).map((_, i) => i));

	// iterate fns.length - n times moving the indexes, then n-1 more times for the remaining items until indexes is empty (we could run a Promise.all with the final chunk too)
	let k = n;
	do {
		const [i, result] = await Promise.race([...indexes].map(async i => {
			results[i] = results[i] ?? fns[i](); // cache promise there, so it continues running across iterations when it's a long one
			const result = await results[i];
			return [i, result];
		}));
		results[i] = result;
		indexes.delete(i);
		if (k < fns.length) indexes.add(k++);
	} while (indexes.size > 0);
	
	return results;
}
delay = t => new Promise(r => setTimeout(r, t, t));
for (let i=1; i<=6; i++) { console.time(i); console.log(await concurrent([500,3000,800,2000,1500].map(t=>()=>delay(t)), i)); console.timeEnd(i); }

@churchofthought
Copy link

churchofthought commented Jan 1, 2021

@caub I thought this was the case but I just confirmed that await x where x is not a promise, will just return x, which is very helpful with regard to the wrapper issue. Promise.all, Promise.race, and all those methods seem to handle non-promises with grace too.

I was able to remove the recursion which I think fixes all the issues raised. I've also added an inPlace parameter, incase we do not want ANY memory allocation at all, and want to just use the existing array.

One more thing, I created a stripped down version called parallelDo which does not store results, it's simply a parallel version of .forEach.

Here's what I got.

const parallelMap = async (arr, f, n = Infinity, inPlace = false) => {
	const results = inPlace ? arr : Array(arr.length) 
	const entriesLeft = arr.entries()

	const worker = async () => {
		for (;;){
			const entry = entriesLeft.next()
			if (entry.done) return

			const [key, val] = entry.value
			results[key] = await f(val, key)
		}
	}

	await Promise.all([...Array(Math.min(arr.length, n))].map(worker))

	return results
}

const parallelDo = async (arr, f, n = Infinity) => {
	const entriesLeft = arr.entries()

	const worker = async () => {
		for (;;){
			const entry = entriesLeft.next()
			if (entry.done) return

			const [key, val] = entry.value
			await f(val, key)
		}
	}

	await Promise.all([...Array(Math.min(arr.length, n))].map(worker))
}

const delay = t => new Promise(r => setTimeout(r, t, t))

// test parallelMap
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelMap(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

// test parallelDo
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelDo(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

I don't think there's any way to make this code any simpler. Would you like to help me publish an NPM package and maintain it?

@churchofthought
Copy link

churchofthought commented Jan 1, 2021

@caub Any idea what kind of tests would be ideal for something like this? Timeouts are a good start but there must be some other kind of client-side async we can maybe use. What immediately comes to mind is having some static files and using util.promisify(fs.readFile) and making sure our maps are returning the correct transformations of a file with 100,000 digits or something.

It would be nice if we could write tests that wouldn't necessarily depend on Node, so that browser-JS package managers like Bower wouldn't get all pissy.

@caub
Copy link

caub commented Jan 2, 2021

  • I think you can forget about bower, it's so old, now we use webpack, rollup, ..
  • util.promisify(fs.readFile) -> just use fs.promises.readFile
  • I think using this delay function is good enough for unit-tests, just use small timeouts
  • I think you don't need 2 functions, just keep parallelMap, the other is the same without return value
  • you could replace [...Array(Math.min(arr.length, n))].map(worker) by Array.from({length: Math.min(arr.length, n)}, worker)
  • you could replace your for loop with for (const [key, val] of entriesLeft) { results[key] = await f(val, key); }
async function parallelMap(arr, f, n = Infinity) {
	const results = arr.map(() => undefined); 
	let i = 0;
	const worker = async () => {
		for (; i<arr.length; i++) {
			results[i] = await f(arr[i], i);
		}
	}

	await Promise.all(Array.from({length: Math.min(arr.length, n)}, worker));

	return results;
}
await parallelMap([100, 200, 400, 800, 1600], delay, 2)

version without async/await if you want:

function parallelMap(arr, f, n = Infinity) {
	const results = arr.map(() => undefined);
	let i = 0;
	const worker = () => {
		return arr.reduce((p) => p.then(() => f(arr[i], i)).then(r => {results[i++] = r;}), Promise.resolve());
	}
	return Promise.all(Array.from({length: Math.min(arr.length, n)}, worker))
		.then(() => results);
}
const start = Date.now():
const r = await parallelMap([8, 9, 10, 4, 5], delay, 3);
const t = Date.now() - start;
assert.deepStrictEqual(r, [...])
assert.ok(14 < t && t < 14 + delta); // delta ~ 3ms tick time

@churchofthought
Copy link

churchofthought commented Jan 2, 2021

Wow, very nice improvements. I am learning a lot from you. Thank you @caub .

Question: since for..of usually calls Symbol.iterator to grab our iterator, can we be sure in this case that it will just use the iterator as-is, shared between all our workers?

Edit: Just did some testing and it appears that handing existing Iterators to for...of works just fine. So the for.. loop will work for the shared iterator. Beautiful. Thanks again caub, I love elegance!! And you definitely have a lot of experience with it 👍

@churchofthought
Copy link

churchofthought commented Jan 3, 2021

Here is the final code with the improvements you mentioned:

const parallelMap = async (arr, f, n = Infinity, inPlace = false) => {
	const results = inPlace ? arr : Array(arr.length) 
	const entries = arr.entries()

	const worker = async () => {
		for (const [key, val] of entries)
			results[key] = await f(val, key)
	}

	await Promise.all(
		Array.from({length: Math.min(arr.length, n)}, worker)
	)

	return results
}

const parallelDo = async (arr, f, n = Infinity) => {
	const entries = arr.entries()

	const worker = async () => {
		for (const [key, val] of entries)
			await f(val, key)
	}

	await Promise.all(
		Array.from({length: Math.min(arr.length, n)}, worker)
	)
}

const delay = t => new Promise(r => setTimeout(r, t, t))

// test parallelMap
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelMap(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

// test parallelDo
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelDo(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

@tomardern
Copy link

Here is my attempt - not sure on performance:

function chunkPromises(array: Promise<any>[], chunkSize: number, delay: number): Promise<any> {
  let p: Promise<any[]> = Promise.resolve([]);
  for (var i = 0; i < array.length; i += chunkSize) {
    const chunk = array.slice(i, i + chunkSize);
    p = i == 0 ? p : p.then(results => new Promise(resolve => setTimeout(resolve, delay, results)));
    p = p.then(results => Promise.all(chunk).then(chunkResults => results.concat(chunkResults)));
  }
  return p;
}

@caub
Copy link

caub commented Jun 4, 2022

@tomardern if the first argument is an array of Promise, it means they are all already running or resolved/rejected, so it won't run concurrently

Try to make a test like I did to verify this, and you need to pass an array of functions returning a promise

@churchofthought
Copy link

churchofthought commented Jun 4, 2022

@tomardern You're mutating/copying (slice, concat) and using recursion. Performance will be abysmal.

In addition, chunking is suboptimal since the longest running task can lower concurrency down to 1 -- in fact, chunking will always cause spikey concurrency. Most use cases desire a fixed concurrency with new tasks immediately replacing finished ones.

The code for parallelMap that I refined with @caub 's help is as optimal as I can envision. It is smoothly concurrent and involves no allocations beyond what's necessary. If you find a way to improve it, that'd be cool. 🤡

@geomus
Copy link

geomus commented Aug 24, 2022

Hi guys! I know is an old thread but just run into it, amazing work! learning a lot about perf conditions you mention. My approach (saving the differences) were to stablish SNS publish and making use of bluebird library (maybe avoidable) with ramda.

const Promise = require('bluebird');
const PASSENGERS_SPLIT = 10;

async function notificationsSNSPublisher(snsPublish, payload) {
  const { passengers } = payload;
  const snsPayload = dissoc('passengers', payload);

  return await Promise.map(splitEvery(PASSENGERS_SPLIT, passengers), async passengersChunk => {
    const messagePayload = {
      ...snsPayload,
      passengers: passengersChunk
    };
    const { MessageId } = await snsPublish(messagePayload, { messageAttributes: {} });

    return MessageId;
  });
}

Is possible to add a delay() function of course. I’m going to gather parts of the code you posted and improve this

@nk1tz
Copy link

nk1tz commented Sep 17, 2022

Here is my typescript version:

const throttledConcurrencyPromiseAll = async <T>(
  arr: Array<any>,
  f: (v: any) => Promise<T>,
  n = Infinity,
): Promise<T[]> => {
  const results = Array(arr.length);
  const entries = arr.entries();
  const worker = async () => {
    for (const [key, val] of entries) results[key] = await f(val);
  };
  await Promise.all(Array.from({ length: Math.min(arr.length, n) }, worker));
  return results;
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment