Skip to content

Instantly share code, notes, and snippets.

@alexsasharegan
Last active April 27, 2018 06:25
Show Gist options
  • Save alexsasharegan/7f61266dbcc130fb1ee22709d299d8b7 to your computer and use it in GitHub Desktop.
Save alexsasharegan/7f61266dbcc130fb1ee22709d299d8b7 to your computer and use it in GitHub Desktop.
Generic pool using Promises.
import { EventEmitter } from "events";
/**
* FactoryFunc returns a Promise
* that resolves with type T.
*/
export type FactoryFunc<T> = () => Promise<T>;
/**
* DestructorFunc takes a type T
* and returns a promise when it has been destroyed.
*/
export type DestructorFunc<T> = (t: T) => Promise<void>;
type DeferredPromise<T> = (value: T | PromiseLike<T>) => void;
/**
* PoolOptions specify the pool's behavior.
*/
export type PoolOptions<T> = {
factory: FactoryFunc<T>;
destructor: DestructorFunc<T>;
max?: number;
min?: number;
maxRetries?: number;
bufferOnStart?: boolean;
};
/**
* Pooler pools values of type `<T>` for reuse.
* Values that are expensive to create (e.g. db connections)
* can be created ahead of time and used by the application.
*/
export interface Pooler<T> extends EventEmitter {
/**
* Get returns a promise that resolve with a value of type `<T>`.
* If a value is available, the promise is resolved immediately.
* If a value is not available, the caller will be enqueued
* and resolved in FIFO order.
*
* @event release Fired immediately before the promise is resolved with `T`.
*/
Get(): Promise<T>;
/**
* Put returns a value of type `<T>` back to the pool.
* If the pool is already full, the destructor for `T` will invoked
* and the value will be discarded.
*
* @event added Fired only if `T` was added to the buffer.
*/
Put(x: T): void;
/**
* Size returns the current size of the buffer pool;
* the number of type `<T>` available in the pool.
*/
Size(): number;
/**
* Use takes an async callback to run with a value of type `<T>`.
* Use abstracts both Pooler.Get and Pooler.Put logic,
* invoking the callback with `T` and
* returning it to the pool once resolved
* or destroying it on error.
*/
Use(
callback: (x: T) => Promise<void>,
onError?: (err: any) => void
): Promise<void>;
/**
* Buffer will asynchronously buffer type `<T>` up to max.
* The returned promise resolves once the buffering is complete.
* If the pool is currently buffering, Buffer returns early.
*
* @event buffering Fired immediately before buffering begins.
* @event full Fired once buffering is complete, before resolving.
*/
Buffer(): Promise<void>;
/**
* Drain empties the pool and invokes the destructor on each value `<T>`.
* It resolves once all destructors have resolved.
*
* @event drained Fired once the pool is drained.
*/
Drain(): Promise<void>;
}
export function NewPooler<T>(options: PoolOptions<T>): Pool<T> {
return new Pool<T>(options);
}
export class Pool<T> extends EventEmitter implements Pooler<T> {
private buf: T[] = [];
private deferred: DeferredPromise<T>[] = [];
private filling: boolean = false;
private draining: boolean = false;
private factory: FactoryFunc<T>;
private destructor: DestructorFunc<T>;
private max: number;
private min: number;
private maxRetries: number;
constructor(options: PoolOptions<T>) {
// EventEmitter constructor.
super();
// Extract options with defaults.
let {
factory,
destructor,
max = 10,
min = 3,
maxRetries = 3,
bufferOnStart = true,
} = options;
// Assignment.
this.factory = factory;
this.destructor = destructor;
this.max = max;
this.min = min;
this.maxRetries = maxRetries;
// monitorLevels will refill the pool if our minimum is reached.
this.on("release", this.monitorLevels.bind(this));
// flushDeferred will resolve callers awaiting a value T.
this.on("added", this.flushDeferred.bind(this));
if (bufferOnStart) {
// Buffer the pool.
this.Buffer();
}
}
private async retryFactory(retries: number): Promise<T> {
if (!retries) {
let err = new RangeError(
`Max attempts to create new pooled type exceeded.`
);
err.name = "RetryLimitError";
throw err;
}
try {
return await this.factory();
} catch (err) {
console.error(err);
return await this.retryFactory(--retries);
}
}
public async Buffer(): Promise<void> {
if (this.filling) {
await new Promise(r => this.once("full", r));
return;
}
let fillTo = this.max - this.Size();
if (fillTo < 1) {
return;
}
let ps: Promise<void>[] = [];
this.filling = true;
this.emit("buffering");
while (fillTo--) {
ps.push(
(async () => {
this.Put(await this.retryFactory(this.maxRetries));
})()
);
}
await Promise.all(ps);
this.filling = false;
this.emit("full");
}
private monitorLevels() {
if (this.Size() < this.min && !this.draining) {
this.Buffer();
}
}
private async flushDeferred() {
let d: DeferredPromise<T> | undefined;
let x: T | undefined;
// Run while we have callers waiting and buffered <T> to release.
while (
this.Size() > 0 &&
(d = this.deferred.shift()) &&
(x = this.buf.pop())
) {
this.emit("release");
d(x);
}
}
public Size(): number {
return this.buf.length;
}
public async Get(): Promise<T> {
// Release a value <T> from the pool.
let x = this.buf.pop();
if (!x) {
// Create a promise to be resolved once the buffer refills.
// Internal 'added' event listener will flush these deferred promises.
return new Promise<T>(resolve => {
this.deferred.push(resolve);
});
}
// Now that we're guaranteed a value, emit our release event.
this.emit("release");
return x;
}
public Put(x: T): void {
if (this.Size() >= this.max || this.draining) {
this.destructor(x);
return;
}
let y: T;
for (y of this.buf) {
if (x === y) {
throw new TypeError(`Cannot 'Put' duplicate value.`);
}
}
this.buf.push(x);
this.emit("added");
}
public async Use(
callback: (x: T) => Promise<void>,
onError?: (err: any) => void
): Promise<void> {
let x = await this.Get();
try {
await callback(x);
} catch (err) {
console.error(err);
this.destructor(x);
if (onError) {
onError(err);
}
return;
}
this.Put(x);
}
public async Drain(): Promise<void> {
if (this.draining) {
await new Promise(r => this.once("drained", r));
return;
}
let ps: Promise<void>[] = [];
let x: T | undefined;
this.draining = true;
while ((x = this.buf.pop())) {
ps.push(this.destructor(x));
}
await Promise.all(ps);
this.draining = false;
this.emit("drained");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment