Last active
October 20, 2016 14:12
-
-
Save calebboyd/ac6cd8e6bda8c6c1626696aa9cbd915e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Queue from 'double-ended-queue' | |
const POOL_ID = Symbol('pool-id') | |
const IS_DIRTY = Symbol('dirty') | |
function isDirty (resource) { | |
return resource[IS_DIRTY] | |
} | |
function poolId (resource) { | |
return resource[POOL_ID] | |
} | |
function range (num) { | |
return Array.from(Array(num)) | |
} | |
class Deferred<T> { | |
public promise: Promise<T> | |
public resolve: (answer: T) => void | |
public reject: (error: Error) => void | |
constructor () { | |
this.promise = new Promise((resolve, reject) => { | |
this.resolve = resolve | |
this.reject = reject | |
}) | |
} | |
} | |
function isDefined (thing: any) { | |
return typeof thing !== 'undefined' | |
} | |
class Pool<T> { | |
private _closing = false | |
private _min: number = 0 | |
private _max: number = 10 | |
private _idle: number = 30000 | |
private _resourceIds: Queue<number> | |
private _resources: Queue<{ expiration: number, resource: T}> | |
private _issued: { [key: string]: T } = {} | |
private _pending: Queue<Deferred<T>> = new Queue<Deferred<T>>() | |
private _timeoutToken: any | |
private _size: number = 0 | |
private _initializing = false | |
private _createResource: () => Promise<T> | |
private _destroyResource: (resource: T) => Promise<any> | |
constructor ({ createConnection, destroyConnection, idle, min, max }) { | |
this._max = max || this._max | |
this._min = min || this._min | |
this._idle = idle || this._idle | |
this._resources = new Queue<{ expiration: number, resource: T}>(max) | |
this._createResource = () => { | |
this._size++ | |
return Promise.resolve(createConnection()) | |
} | |
this._destroyResource = (x) => { | |
this._size-- | |
return Promise.resolve(destroyConnection(x)) | |
} | |
this._resourceIds = new Queue(range(this.max).map((_, i) => 1 + i)) | |
if (this.min > 0) { | |
this._initializing = true | |
Promise.all(range(this.min).map(this._createResource)) | |
.then(resources => { | |
this._resources.enqueue( | |
...resources.map(resource => ({ expiration: Date.now(), resource })) | |
) | |
this._pending.toArray().forEach(x => x.resolve(this.resource())) | |
this._initializing = false | |
this._pending.clear() | |
}) | |
} | |
} | |
/** | |
* Current number of resources in pool | |
*/ | |
public get size () { return this._size } | |
/** | |
* Current number of resources in use | |
*/ | |
public get issued () { return this.max - this._resourceIds.length } | |
/** | |
* Maximum size of pool | |
*/ | |
public get max () { return this._max } | |
/** | |
* Minimum size of pool | |
*/ | |
public get min () { return this._min } | |
/** | |
* Approximate lifetime (ms) of an unused resource | |
*/ | |
public get idle () { return this._idle } | |
/** | |
* Forcibly destroy all created resources | |
* To close the pool gracefully, call #drain first | |
* A subsequent call to #resource is a programming error | |
*/ | |
public close () { | |
this._closing = true | |
return Promise.all(Object.keys(this._issued) | |
.map(x => this._issued[x]) | |
.concat(this._resources.toArray().map(x => x.resource)) | |
.map(x => this.destroy(x))) | |
} | |
/** | |
* Wait until all resources have been released | |
* This will not prevent resources from being issued. (calls to #resource) | |
* Usually followed by a call to #close. | |
* Calling close while draining, will short circuit this process | |
*/ | |
public drain () { | |
return new Promise ((resolve) => { | |
!function wait () { | |
if (this.issued === 0) { | |
resolve() | |
} else { | |
!this._closing && setTimeout(wait, 100) || resolve() | |
} | |
}() | |
}) | |
} | |
/** | |
* Destroy a resource. This is useful if you deterime the resource is in an error state. | |
* It can be called at any time. | |
* If a destroyed resource is currently issued it will also be released | |
*/ | |
public destroy (resource) { | |
if (isDirty(resource)) { | |
return Promise.resolve() | |
} | |
resource[IS_DIRTY] = true | |
if (poolId(resource)) { | |
this.release(resource) | |
} | |
return this._destroyResource(resource) | |
} | |
/** | |
* Immediately release a resource back into the pool. | |
* If the resource has not also been destroyed it may be recycled immediately. | |
* Released resources that remain unused for #idle milliseconds will be destroyed. | |
*/ | |
public release (resource) { | |
const resourceId = poolId(resource) | |
if (!isDirty(resource) && this._pending.length) { | |
return this._pending.dequeue().resolve(resource) | |
} | |
delete this._issued[resourceId] | |
delete resource[POOL_ID] | |
this._resourceIds.enqueue(resourceId) | |
!isDirty(resource) && this._queuePossibleDestruction(resource) | |
} | |
/** | |
* Request a resource, if none exist, request will be queued or one will be created. | |
* Otherwise, previously released resources are issued. | |
*/ | |
public resource () { | |
if (this._closing) { | |
return Promise.reject(new Error('Cannot issue resource while pool is closing...')) | |
} | |
if (!this._resourceIds.length || this._initializing) { | |
const futureAvailableResource = new Deferred<T>() | |
this._pending.enqueue(futureAvailableResource) | |
return futureAvailableResource.promise | |
} | |
const resourceId = this._resourceIds.dequeue() | |
if (this._resources.length) { | |
const { resource } = this._resources.dequeue() | |
if (isDirty(resource)) { | |
this._resourceIds.enqueue(resourceId) | |
return this.resource() | |
} | |
return Promise.resolve(this._dispatchResource(resource, resourceId)) | |
} | |
return this._createResource().then(resource => { | |
return this._dispatchResource(resource, resourceId) | |
}) | |
} | |
private _dispatchResource (resource: any, resourceId: number) { | |
resource[POOL_ID] = resourceId | |
this._issued[resourceId] = resource | |
return resource | |
} | |
private _queuePossibleDestruction (resource :T) { | |
this._resources.enqueue({ expiration: Date.now() + this._idle, resource }) | |
if (!this._timeoutToken) { | |
this._scheduleNextCleanup(this._idle) | |
} | |
} | |
private _cleanup () { | |
if (this.size === this._min || !this._resources.length) { | |
return this._timeoutToken = null | |
} | |
const { expiration } = this._resources.peekFront(), | |
expiredMsAgo = expiration - Date.now() | |
if (expiredMsAgo <= 0) { | |
const { resource } = this._resources.dequeue() | |
this.destroy(resource) | |
this._scheduleNextCleanup(100) | |
} else { | |
this._scheduleNextCleanup(expiredMsAgo + 1) | |
} | |
} | |
private _scheduleNextCleanup(ms: number) { | |
if (!this._closing) { | |
this._timeoutToken = setTimeout(() => { | |
this._cleanup() | |
}, ms) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment