Last active
April 27, 2018 06:25
-
-
Save alexsasharegan/7f61266dbcc130fb1ee22709d299d8b7 to your computer and use it in GitHub Desktop.
Generic pool using Promises.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { EventEmitter } from "events"; | |
/** | |
* FactoryFunc returns a Promise | |
* that resolves with type T. | |
*/ | |
export type FactoryFunc<T> = () => Promise<T>; | |
/** | |
* DestructorFunc takes a type T | |
* and returns a promise when it has been destroyed. | |
*/ | |
export type DestructorFunc<T> = (t: T) => Promise<void>; | |
type DeferredPromise<T> = (value: T | PromiseLike<T>) => void; | |
/** | |
* PoolOptions specify the pool's behavior. | |
*/ | |
export type PoolOptions<T> = { | |
factory: FactoryFunc<T>; | |
destructor: DestructorFunc<T>; | |
max?: number; | |
min?: number; | |
maxRetries?: number; | |
bufferOnStart?: boolean; | |
}; | |
/** | |
* Pooler pools values of type `<T>` for reuse. | |
* Values that are expensive to create (e.g. db connections) | |
* can be created ahead of time and used by the application. | |
*/ | |
export interface Pooler<T> extends EventEmitter { | |
/** | |
* Get returns a promise that resolve with a value of type `<T>`. | |
* If a value is available, the promise is resolved immediately. | |
* If a value is not available, the caller will be enqueued | |
* and resolved in FIFO order. | |
* | |
* @event release Fired immediately before the promise is resolved with `T`. | |
*/ | |
Get(): Promise<T>; | |
/** | |
* Put returns a value of type `<T>` back to the pool. | |
* If the pool is already full, the destructor for `T` will invoked | |
* and the value will be discarded. | |
* | |
* @event added Fired only if `T` was added to the buffer. | |
*/ | |
Put(x: T): void; | |
/** | |
* Size returns the current size of the buffer pool; | |
* the number of type `<T>` available in the pool. | |
*/ | |
Size(): number; | |
/** | |
* Use takes an async callback to run with a value of type `<T>`. | |
* Use abstracts both Pooler.Get and Pooler.Put logic, | |
* invoking the callback with `T` and | |
* returning it to the pool once resolved | |
* or destroying it on error. | |
*/ | |
Use( | |
callback: (x: T) => Promise<void>, | |
onError?: (err: any) => void | |
): Promise<void>; | |
/** | |
* Buffer will asynchronously buffer type `<T>` up to max. | |
* The returned promise resolves once the buffering is complete. | |
* If the pool is currently buffering, Buffer returns early. | |
* | |
* @event buffering Fired immediately before buffering begins. | |
* @event full Fired once buffering is complete, before resolving. | |
*/ | |
Buffer(): Promise<void>; | |
/** | |
* Drain empties the pool and invokes the destructor on each value `<T>`. | |
* It resolves once all destructors have resolved. | |
* | |
* @event drained Fired once the pool is drained. | |
*/ | |
Drain(): Promise<void>; | |
} | |
export function NewPooler<T>(options: PoolOptions<T>): Pool<T> { | |
return new Pool<T>(options); | |
} | |
export class Pool<T> extends EventEmitter implements Pooler<T> { | |
private buf: T[] = []; | |
private deferred: DeferredPromise<T>[] = []; | |
private filling: boolean = false; | |
private draining: boolean = false; | |
private factory: FactoryFunc<T>; | |
private destructor: DestructorFunc<T>; | |
private max: number; | |
private min: number; | |
private maxRetries: number; | |
constructor(options: PoolOptions<T>) { | |
// EventEmitter constructor. | |
super(); | |
// Extract options with defaults. | |
let { | |
factory, | |
destructor, | |
max = 10, | |
min = 3, | |
maxRetries = 3, | |
bufferOnStart = true, | |
} = options; | |
// Assignment. | |
this.factory = factory; | |
this.destructor = destructor; | |
this.max = max; | |
this.min = min; | |
this.maxRetries = maxRetries; | |
// monitorLevels will refill the pool if our minimum is reached. | |
this.on("release", this.monitorLevels.bind(this)); | |
// flushDeferred will resolve callers awaiting a value T. | |
this.on("added", this.flushDeferred.bind(this)); | |
if (bufferOnStart) { | |
// Buffer the pool. | |
this.Buffer(); | |
} | |
} | |
private async retryFactory(retries: number): Promise<T> { | |
if (!retries) { | |
let err = new RangeError( | |
`Max attempts to create new pooled type exceeded.` | |
); | |
err.name = "RetryLimitError"; | |
throw err; | |
} | |
try { | |
return await this.factory(); | |
} catch (err) { | |
console.error(err); | |
return await this.retryFactory(--retries); | |
} | |
} | |
public async Buffer(): Promise<void> { | |
if (this.filling) { | |
await new Promise(r => this.once("full", r)); | |
return; | |
} | |
let fillTo = this.max - this.Size(); | |
if (fillTo < 1) { | |
return; | |
} | |
let ps: Promise<void>[] = []; | |
this.filling = true; | |
this.emit("buffering"); | |
while (fillTo--) { | |
ps.push( | |
(async () => { | |
this.Put(await this.retryFactory(this.maxRetries)); | |
})() | |
); | |
} | |
await Promise.all(ps); | |
this.filling = false; | |
this.emit("full"); | |
} | |
private monitorLevels() { | |
if (this.Size() < this.min && !this.draining) { | |
this.Buffer(); | |
} | |
} | |
private async flushDeferred() { | |
let d: DeferredPromise<T> | undefined; | |
let x: T | undefined; | |
// Run while we have callers waiting and buffered <T> to release. | |
while ( | |
this.Size() > 0 && | |
(d = this.deferred.shift()) && | |
(x = this.buf.pop()) | |
) { | |
this.emit("release"); | |
d(x); | |
} | |
} | |
public Size(): number { | |
return this.buf.length; | |
} | |
public async Get(): Promise<T> { | |
// Release a value <T> from the pool. | |
let x = this.buf.pop(); | |
if (!x) { | |
// Create a promise to be resolved once the buffer refills. | |
// Internal 'added' event listener will flush these deferred promises. | |
return new Promise<T>(resolve => { | |
this.deferred.push(resolve); | |
}); | |
} | |
// Now that we're guaranteed a value, emit our release event. | |
this.emit("release"); | |
return x; | |
} | |
public Put(x: T): void { | |
if (this.Size() >= this.max || this.draining) { | |
this.destructor(x); | |
return; | |
} | |
let y: T; | |
for (y of this.buf) { | |
if (x === y) { | |
throw new TypeError(`Cannot 'Put' duplicate value.`); | |
} | |
} | |
this.buf.push(x); | |
this.emit("added"); | |
} | |
public async Use( | |
callback: (x: T) => Promise<void>, | |
onError?: (err: any) => void | |
): Promise<void> { | |
let x = await this.Get(); | |
try { | |
await callback(x); | |
} catch (err) { | |
console.error(err); | |
this.destructor(x); | |
if (onError) { | |
onError(err); | |
} | |
return; | |
} | |
this.Put(x); | |
} | |
public async Drain(): Promise<void> { | |
if (this.draining) { | |
await new Promise(r => this.once("drained", r)); | |
return; | |
} | |
let ps: Promise<void>[] = []; | |
let x: T | undefined; | |
this.draining = true; | |
while ((x = this.buf.pop())) { | |
ps.push(this.destructor(x)); | |
} | |
await Promise.all(ps); | |
this.draining = false; | |
this.emit("drained"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment