Skip to content

Instantly share code, notes, and snippets.

@dheniges
Created February 14, 2024 23:47
Show Gist options
  • Save dheniges/ebfae6654c93b5f26be32ba466d4a28f to your computer and use it in GitHub Desktop.
Save dheniges/ebfae6654c93b5f26be32ba466d4a28f to your computer and use it in GitHub Desktop.
A small node.js module to allow concurrent processing of any AsyncIterator via AsyncGenerator
import { randomUUID } from 'crypto'
// A concurrency-limited pool of promises generated from any async iterable compatible feed.
// A processor function is required to take the result of the async iteration and return a value.
// Processor func can be async
//
// Will only run [concurrency] operations at once
// The pool will fill as fast as the async iteration function can provide values, up to the max concurrency
export default async function* asyncPool(concurrency: number, itemFeed: AsyncIterable<any>, iteratorFn: (...args : any[]) => any) {
const promiseMap = new Map()
async function completePromise() {
const [id, value] = await Promise.race(promiseMap.values())
promiseMap.delete(id)
return value
}
for await (const item of itemFeed) {
const id = randomUUID() // Assign a random id to the promise
const promise: any = Promise.all([id, iteratorFn(item)])
promiseMap.set(id, promise)
if (promiseMap.size >= concurrency) {
yield await completePromise()
}
}
// Finish any remaining promises still running
while (promiseMap.size) {
yield await completePromise()
}
}
@dheniges
Copy link
Author

Example usage:

import asyncPool from './asyncPool'

const dbStream = getReadableDatabaseStream()
const processor = (dbItem) => { return await someProcessing(dbItem) }

let resultCount = 0

// Concurrently run 10 operations on dbItems
for await (const result of asyncPool(10, dbStream, processor)) {
   // result is the return of the processor function
   resultCount++
}

console.log(`Processed ${resultCount} results`)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment