Skip to content

Instantly share code, notes, and snippets.

@alexpsi
Created February 25, 2017 20:31
Show Gist options
  • Save alexpsi/43dd7fd4d6a263c7485326b843677740 to your computer and use it in GitHub Desktop.
Save alexpsi/43dd7fd4d6a263c7485326b843677740 to your computer and use it in GitHub Desktop.
Like Promise.all but executed in chunks, so you can have some async concurrency control.
const chunks = (arr, chunkSize) => {
let results = [];
while (arr.length) results.push(arr.splice(0, chunkSize));
return results;
};
module.exports = (xs, f, concurrency) => {
if (xs.length == 0) return Promise.resolve();
return Promise.all(chunks(xs, concurrency).reduce(
(acc, chunk) => acc.then(() => Promise.all(chunk.map(f))),
Promise.resolve()
));
};
@churchofthought
Copy link

churchofthought commented Jan 1, 2021

@caub Any idea what kind of tests would be ideal for something like this? Timeouts are a good start but there must be some other kind of client-side async we can maybe use. What immediately comes to mind is having some static files and using util.promisify(fs.readFile) and making sure our maps are returning the correct transformations of a file with 100,000 digits or something.

It would be nice if we could write tests that wouldn't necessarily depend on Node, so that browser-JS package managers like Bower wouldn't get all pissy.

@caub
Copy link

caub commented Jan 2, 2021

  • I think you can forget about bower, it's so old, now we use webpack, rollup, ..
  • util.promisify(fs.readFile) -> just use fs.promises.readFile
  • I think using this delay function is good enough for unit-tests, just use small timeouts
  • I think you don't need 2 functions, just keep parallelMap, the other is the same without return value
  • you could replace [...Array(Math.min(arr.length, n))].map(worker) by Array.from({length: Math.min(arr.length, n)}, worker)
  • you could replace your for loop with for (const [key, val] of entriesLeft) { results[key] = await f(val, key); }
async function parallelMap(arr, f, n = Infinity) {
	const results = arr.map(() => undefined); 
	let i = 0;
	const worker = async () => {
		for (; i<arr.length; i++) {
			results[i] = await f(arr[i], i);
		}
	}

	await Promise.all(Array.from({length: Math.min(arr.length, n)}, worker));

	return results;
}
await parallelMap([100, 200, 400, 800, 1600], delay, 2)

version without async/await if you want:

function parallelMap(arr, f, n = Infinity) {
	const results = arr.map(() => undefined);
	let i = 0;
	const worker = () => {
		return arr.reduce((p) => p.then(() => f(arr[i], i)).then(r => {results[i++] = r;}), Promise.resolve());
	}
	return Promise.all(Array.from({length: Math.min(arr.length, n)}, worker))
		.then(() => results);
}
const start = Date.now():
const r = await parallelMap([8, 9, 10, 4, 5], delay, 3);
const t = Date.now() - start;
assert.deepStrictEqual(r, [...])
assert.ok(14 < t && t < 14 + delta); // delta ~ 3ms tick time

@churchofthought
Copy link

churchofthought commented Jan 2, 2021

Wow, very nice improvements. I am learning a lot from you. Thank you @caub .

Question: since for..of usually calls Symbol.iterator to grab our iterator, can we be sure in this case that it will just use the iterator as-is, shared between all our workers?

Edit: Just did some testing and it appears that handing existing Iterators to for...of works just fine. So the for.. loop will work for the shared iterator. Beautiful. Thanks again caub, I love elegance!! And you definitely have a lot of experience with it 👍

@churchofthought
Copy link

churchofthought commented Jan 3, 2021

Here is the final code with the improvements you mentioned:

const parallelMap = async (arr, f, n = Infinity, inPlace = false) => {
	const results = inPlace ? arr : Array(arr.length) 
	const entries = arr.entries()

	const worker = async () => {
		for (const [key, val] of entries)
			results[key] = await f(val, key)
	}

	await Promise.all(
		Array.from({length: Math.min(arr.length, n)}, worker)
	)

	return results
}

const parallelDo = async (arr, f, n = Infinity) => {
	const entries = arr.entries()

	const worker = async () => {
		for (const [key, val] of entries)
			await f(val, key)
	}

	await Promise.all(
		Array.from({length: Math.min(arr.length, n)}, worker)
	)
}

const delay = t => new Promise(r => setTimeout(r, t, t))

// test parallelMap
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelMap(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

// test parallelDo
for (var n = 1; n <= 5; ++n) {
	const tid = `${n} workers`
	console.time(tid); console.log(await parallelDo(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}

@tomardern
Copy link

Here is my attempt - not sure on performance:

function chunkPromises(array: Promise<any>[], chunkSize: number, delay: number): Promise<any> {
  let p: Promise<any[]> = Promise.resolve([]);
  for (var i = 0; i < array.length; i += chunkSize) {
    const chunk = array.slice(i, i + chunkSize);
    p = i == 0 ? p : p.then(results => new Promise(resolve => setTimeout(resolve, delay, results)));
    p = p.then(results => Promise.all(chunk).then(chunkResults => results.concat(chunkResults)));
  }
  return p;
}

@caub
Copy link

caub commented Jun 4, 2022

@tomardern if the first argument is an array of Promise, it means they are all already running or resolved/rejected, so it won't run concurrently

Try to make a test like I did to verify this, and you need to pass an array of functions returning a promise

@churchofthought
Copy link

churchofthought commented Jun 4, 2022

@tomardern You're mutating/copying (slice, concat) and using recursion. Performance will be abysmal.

In addition, chunking is suboptimal since the longest running task can lower concurrency down to 1 -- in fact, chunking will always cause spikey concurrency. Most use cases desire a fixed concurrency with new tasks immediately replacing finished ones.

The code for parallelMap that I refined with @caub 's help is as optimal as I can envision. It is smoothly concurrent and involves no allocations beyond what's necessary. If you find a way to improve it, that'd be cool. 🤡

@geomus
Copy link

geomus commented Aug 24, 2022

Hi guys! I know is an old thread but just run into it, amazing work! learning a lot about perf conditions you mention. My approach (saving the differences) were to stablish SNS publish and making use of bluebird library (maybe avoidable) with ramda.

const Promise = require('bluebird');
const PASSENGERS_SPLIT = 10;

async function notificationsSNSPublisher(snsPublish, payload) {
  const { passengers } = payload;
  const snsPayload = dissoc('passengers', payload);

  return await Promise.map(splitEvery(PASSENGERS_SPLIT, passengers), async passengersChunk => {
    const messagePayload = {
      ...snsPayload,
      passengers: passengersChunk
    };
    const { MessageId } = await snsPublish(messagePayload, { messageAttributes: {} });

    return MessageId;
  });
}

Is possible to add a delay() function of course. I’m going to gather parts of the code you posted and improve this

@nk1tz
Copy link

nk1tz commented Sep 17, 2022

Here is my typescript version:

const throttledConcurrencyPromiseAll = async <T>(
  arr: Array<any>,
  f: (v: any) => Promise<T>,
  n = Infinity,
): Promise<T[]> => {
  const results = Array(arr.length);
  const entries = arr.entries();
  const worker = async () => {
    for (const [key, val] of entries) results[key] = await f(val);
  };
  await Promise.all(Array.from({ length: Math.min(arr.length, n) }, worker));
  return results;
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment