Skip to content

Instantly share code, notes, and snippets.

@myobie
Last active January 25, 2021 15:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save myobie/e095274f449f7fd438bf26ab338fce82 to your computer and use it in GitHub Desktop.
Save myobie/e095274f449f7fd438bf26ab338fce82 to your computer and use it in GitHub Desktop.
Drain an array of potential promises with a max concurrency factor
/** @typedef {() => Promise<unknown>} Task */
/**
* @param {Task[]} pending
* @param {number} max
* @returns {Promise<unknown[]>}
*/
export function drain (pending, max) {
return new Promise((resolve, reject) => {
let nextIndex = 0
/** @type {Map<number, Promise<unknown>>} */
const inProgress = new Map()
/** @type {unknown[]} */
const complete = []
/** @type {Error | undefined} */
let firstError
next(max)
function finish () {
if (firstError) {
reject(firstError)
} else {
resolve(complete)
}
}
/** @param {number} [amount=1] */
function next (amount = 1) {
if (pending.length === 0 && inProgress.size === 0) {
finish()
return
}
if (inProgress.size < max && pending.length > 0) {
const callbacks = pending.slice(0, amount)
pending.splice(0, amount)
callbacks.forEach(cb => {
const promise = cb()
const currentIndex = nextIndex
nextIndex++
inProgress.set(currentIndex, promise)
promise
.then(v => {
complete[currentIndex] = v
})
.catch(e => {
if (!firstError) {
firstError = e
}
})
.finally(() => {
inProgress.delete(currentIndex)
next()
})
})
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment