Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DavidWells/f61c7eeec666bb79b798ca3f525650f4 to your computer and use it in GitHub Desktop.
Save DavidWells/f61c7eeec666bb79b798ca3f525650f4 to your computer and use it in GitHub Desktop.
Async forEach with concurrency limit and Abort Controller
// via https://github.com/vatesfr/xen-orchestra/tree/a1c0d82889dbf40aebb87d6e486dc12fee49adbc/%40vates/async-each
const noop = Function.prototype
class AggregateError extends Error {
constructor(errors, message) {
super(message)
this.errors = errors
}
}
function asyncEach(iterable, iteratee, { concurrency = 1, signal, stopOnError = true } = {}) {
let result = []
return new Promise((resolve, reject) => {
const it = (iterable[Symbol.iterator] || iterable[Symbol.asyncIterator]).call(iterable)
const errors = []
let running = 0
let index = 0
let onAbort
if (signal !== undefined) {
onAbort = () => {
onRejectedWrapper(new Error('asyncEach aborted'))
}
signal.addEventListener('abort', onAbort)
}
const clean = () => {
onFulfilled = onRejected = noop
if (onAbort !== undefined) {
signal.removeEventListener('abort', onAbort)
}
}
resolve = (resolve =>
function resolveAndClean(value) {
resolve(value)
clean()
})(resolve)
reject = (reject =>
function rejectAndClean(reason) {
reject(reason)
clean()
})(reject)
let onFulfilled = value => {
console.log('onFulfilled', value)
result = result.concat(value)
--running
next(value)
}
const onFulfilledWrapper = value => onFulfilled(value)
let onRejected = stopOnError
? reject
: error => {
--running
errors.push(error)
next()
}
const onRejectedWrapper = reason => onRejected(reason)
let nextIsRunning = false
let next = async (v) => {
if (nextIsRunning) {
return
}
nextIsRunning = true
if (running < concurrency) {
const cursor = await it.next()
console.log('cursor', cursor)
if (cursor.done) {
next = () => {
if (running === 0) {
if (errors.length === 0) {
resolve(result)
} else {
reject(new AggregateError(errors))
}
}
}
} else {
++running
try {
const result = iteratee.call(this, cursor.value, index++, iterable)
console.log('result', result)
let then
if (result != null && typeof result === 'object' && typeof (then = result.then) === 'function') {
console.log('is thenable')
then.call(result, onFulfilledWrapper, onRejectedWrapper)
} else {
onFulfilled(result)
}
} catch (error) {
onRejected(error)
}
}
nextIsRunning = false
return next()
}
nextIsRunning = false
}
next()
})
}
function delay(ms) {
console.log('delay ms', ms)
return new Promise(res => setTimeout(res, ms))
}
async function doIt() {
const contents = []
return asyncEach(items, async function (filename, i) {
await delay(2000)
const name = filename + 'xyz'
contents[i] = name
return name
},
{
// reads two files at a time
concurrency: 2,
})
}
const items = ['foo.txt', 'bar.txt', 'baz.txt']
doIt(items).then((x) => {
console.log('x', x)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment