-
-
Save alexpsi/43dd7fd4d6a263c7485326b843677740 to your computer and use it in GitHub Desktop.
const chunks = (arr, chunkSize) => { | |
let results = []; | |
while (arr.length) results.push(arr.splice(0, chunkSize)); | |
return results; | |
}; | |
module.exports = (xs, f, concurrency) => { | |
if (xs.length == 0) return Promise.resolve(); | |
return Promise.all(chunks(xs, concurrency).reduce( | |
(acc, chunk) => acc.then(() => Promise.all(chunk.map(f))), | |
Promise.resolve() | |
)); | |
}; |
Wow, very nice improvements. I am learning a lot from you. Thank you @caub .
Question: since for..of usually calls Symbol.iterator to grab our iterator, can we be sure in this case that it will just use the iterator as-is, shared between all our workers?
Edit: Just did some testing and it appears that handing existing Iterators to for...of works just fine. So the for.. loop will work for the shared iterator. Beautiful. Thanks again caub, I love elegance!! And you definitely have a lot of experience with it 👍
Here is the final code with the improvements you mentioned:
const parallelMap = async (arr, f, n = Infinity, inPlace = false) => {
const results = inPlace ? arr : Array(arr.length)
const entries = arr.entries()
const worker = async () => {
for (const [key, val] of entries)
results[key] = await f(val, key)
}
await Promise.all(
Array.from({length: Math.min(arr.length, n)}, worker)
)
return results
}
const parallelDo = async (arr, f, n = Infinity) => {
const entries = arr.entries()
const worker = async () => {
for (const [key, val] of entries)
await f(val, key)
}
await Promise.all(
Array.from({length: Math.min(arr.length, n)}, worker)
)
}
const delay = t => new Promise(r => setTimeout(r, t, t))
// test parallelMap
for (var n = 1; n <= 5; ++n) {
const tid = `${n} workers`
console.time(tid); console.log(await parallelMap(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}
// test parallelDo
for (var n = 1; n <= 5; ++n) {
const tid = `${n} workers`
console.time(tid); console.log(await parallelDo(await [100, 200, 400, 800, 1600],delay, n)); console.timeEnd(tid)
}
Here is my attempt - not sure on performance:
function chunkPromises(array: Promise<any>[], chunkSize: number, delay: number): Promise<any> {
let p: Promise<any[]> = Promise.resolve([]);
for (var i = 0; i < array.length; i += chunkSize) {
const chunk = array.slice(i, i + chunkSize);
p = i == 0 ? p : p.then(results => new Promise(resolve => setTimeout(resolve, delay, results)));
p = p.then(results => Promise.all(chunk).then(chunkResults => results.concat(chunkResults)));
}
return p;
}
@tomardern if the first argument is an array of Promise, it means they are all already running or resolved/rejected, so it won't run concurrently
Try to make a test like I did to verify this, and you need to pass an array of functions returning a promise
@tomardern You're mutating/copying (slice, concat) and using recursion. Performance will be abysmal.
In addition, chunking is suboptimal since the longest running task can lower concurrency down to 1 -- in fact, chunking will always cause spikey concurrency. Most use cases desire a fixed concurrency with new tasks immediately replacing finished ones.
The code for parallelMap that I refined with @caub 's help is as optimal as I can envision. It is smoothly concurrent and involves no allocations beyond what's necessary. If you find a way to improve it, that'd be cool. 🤡
Hi guys! I know is an old thread but just run into it, amazing work! learning a lot about perf conditions you mention. My approach (saving the differences) were to stablish SNS publish and making use of bluebird
library (maybe avoidable) with ramda.
const Promise = require('bluebird');
const PASSENGERS_SPLIT = 10;
async function notificationsSNSPublisher(snsPublish, payload) {
const { passengers } = payload;
const snsPayload = dissoc('passengers', payload);
return await Promise.map(splitEvery(PASSENGERS_SPLIT, passengers), async passengersChunk => {
const messagePayload = {
...snsPayload,
passengers: passengersChunk
};
const { MessageId } = await snsPublish(messagePayload, { messageAttributes: {} });
return MessageId;
});
}
Is possible to add a delay()
function of course. I’m going to gather parts of the code you posted and improve this
Here is my typescript version:
const throttledConcurrencyPromiseAll = async <T>(
arr: Array<any>,
f: (v: any) => Promise<T>,
n = Infinity,
): Promise<T[]> => {
const results = Array(arr.length);
const entries = arr.entries();
const worker = async () => {
for (const [key, val] of entries) results[key] = await f(val);
};
await Promise.all(Array.from({ length: Math.min(arr.length, n) }, worker));
return results;
};
[...Array(Math.min(arr.length, n))].map(worker)
byArray.from({length: Math.min(arr.length, n)}, worker)
for (const [key, val] of entriesLeft) { results[key] = await f(val, key); }
version without async/await if you want: