Created
February 26, 2022 08:07
-
-
Save DavidWells/f61c7eeec666bb79b798ca3f525650f4 to your computer and use it in GitHub Desktop.
Async forEach with concurrency limit and Abort Controller
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// via https://github.com/vatesfr/xen-orchestra/tree/a1c0d82889dbf40aebb87d6e486dc12fee49adbc/%40vates/async-each | |
const noop = Function.prototype | |
class AggregateError extends Error { | |
constructor(errors, message) { | |
super(message) | |
this.errors = errors | |
} | |
} | |
function asyncEach(iterable, iteratee, { concurrency = 1, signal, stopOnError = true } = {}) { | |
let result = [] | |
return new Promise((resolve, reject) => { | |
const it = (iterable[Symbol.iterator] || iterable[Symbol.asyncIterator]).call(iterable) | |
const errors = [] | |
let running = 0 | |
let index = 0 | |
let onAbort | |
if (signal !== undefined) { | |
onAbort = () => { | |
onRejectedWrapper(new Error('asyncEach aborted')) | |
} | |
signal.addEventListener('abort', onAbort) | |
} | |
const clean = () => { | |
onFulfilled = onRejected = noop | |
if (onAbort !== undefined) { | |
signal.removeEventListener('abort', onAbort) | |
} | |
} | |
resolve = (resolve => | |
function resolveAndClean(value) { | |
resolve(value) | |
clean() | |
})(resolve) | |
reject = (reject => | |
function rejectAndClean(reason) { | |
reject(reason) | |
clean() | |
})(reject) | |
let onFulfilled = value => { | |
console.log('onFulfilled', value) | |
result = result.concat(value) | |
--running | |
next(value) | |
} | |
const onFulfilledWrapper = value => onFulfilled(value) | |
let onRejected = stopOnError | |
? reject | |
: error => { | |
--running | |
errors.push(error) | |
next() | |
} | |
const onRejectedWrapper = reason => onRejected(reason) | |
let nextIsRunning = false | |
let next = async (v) => { | |
if (nextIsRunning) { | |
return | |
} | |
nextIsRunning = true | |
if (running < concurrency) { | |
const cursor = await it.next() | |
console.log('cursor', cursor) | |
if (cursor.done) { | |
next = () => { | |
if (running === 0) { | |
if (errors.length === 0) { | |
resolve(result) | |
} else { | |
reject(new AggregateError(errors)) | |
} | |
} | |
} | |
} else { | |
++running | |
try { | |
const result = iteratee.call(this, cursor.value, index++, iterable) | |
console.log('result', result) | |
let then | |
if (result != null && typeof result === 'object' && typeof (then = result.then) === 'function') { | |
console.log('is thenable') | |
then.call(result, onFulfilledWrapper, onRejectedWrapper) | |
} else { | |
onFulfilled(result) | |
} | |
} catch (error) { | |
onRejected(error) | |
} | |
} | |
nextIsRunning = false | |
return next() | |
} | |
nextIsRunning = false | |
} | |
next() | |
}) | |
} | |
function delay(ms) { | |
console.log('delay ms', ms) | |
return new Promise(res => setTimeout(res, ms)) | |
} | |
async function doIt() { | |
const contents = [] | |
return asyncEach(items, async function (filename, i) { | |
await delay(2000) | |
const name = filename + 'xyz' | |
contents[i] = name | |
return name | |
}, | |
{ | |
// reads two files at a time | |
concurrency: 2, | |
}) | |
} | |
const items = ['foo.txt', 'bar.txt', 'baz.txt'] | |
doIt(items).then((x) => { | |
console.log('x', x) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment