Skip to content

Instantly share code, notes, and snippets.

@kjin
Last active March 8, 2018 18:26
Show Gist options
  • Save kjin/ab81053f07b893af4ba61b337ad1b518 to your computer and use it in GitHub Desktop.
Save kjin/ab81053f07b893af4ba61b337ad1b518 to your computer and use it in GitHub Desktop.
import { AsyncResource, executionAsyncId } from 'async_hooks';
type AsyncTaskCallback<T> = (err: Error, output: T) => void;
type AsyncTask<I, O> = (input: I, cb: AsyncTaskCallback<O>) => void;
let ID = 0;
/**
* Wraps a function that represents an asynchronous task so that only a certain
* number of instances of that task can be happening at a time. If the function
* is invoked when the maximum number of running tasks is met, it will be queued
* to be run later.
* @param task The function to wrap.
* @param numWorkers The maximum number of instances of that task that can run
* at a time.
*/
export function wrapPool<I, O>(task: AsyncTask<I, O>, numWorkers: number): AsyncTask<I, O> {
let numWorkersActive = 0;
const queue: Array<() => void> = [];
const id: number = ID++;
return (input: I, cb: AsyncTaskCallback<O>) => {
const asyncResource = new AsyncResource(`POOL-${id}`);
const execute = () => {
asyncResource.emitBefore();
numWorkersActive++;
task(input, (err: Error, output: O) => {
numWorkersActive--;
if (queue.length > 0) {
queue.splice(0, 1)[0]();
}
cb(err, output);
setImmediate(asyncResource.emitDestroy);
});
asyncResource.emitAfter();
}
if (numWorkersActive < numWorkers) {
execute();
} else {
queue.push(execute);
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment