Skip to content

Instantly share code, notes, and snippets.

@calebboyd
Last active October 20, 2016 14:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calebboyd/ac6cd8e6bda8c6c1626696aa9cbd915e to your computer and use it in GitHub Desktop.
Save calebboyd/ac6cd8e6bda8c6c1626696aa9cbd915e to your computer and use it in GitHub Desktop.
import Queue from 'double-ended-queue'
const POOL_ID = Symbol('pool-id')
const IS_DIRTY = Symbol('dirty')
function isDirty (resource) {
return resource[IS_DIRTY]
}
function poolId (resource) {
return resource[POOL_ID]
}
function range (num) {
return Array.from(Array(num))
}
class Deferred<T> {
public promise: Promise<T>
public resolve: (answer: T) => void
public reject: (error: Error) => void
constructor () {
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve
this.reject = reject
})
}
}
function isDefined (thing: any) {
return typeof thing !== 'undefined'
}
class Pool<T> {
private _closing = false
private _min: number = 0
private _max: number = 10
private _idle: number = 30000
private _resourceIds: Queue<number>
private _resources: Queue<{ expiration: number, resource: T}>
private _issued: { [key: string]: T } = {}
private _pending: Queue<Deferred<T>> = new Queue<Deferred<T>>()
private _timeoutToken: any
private _size: number = 0
private _initializing = false
private _createResource: () => Promise<T>
private _destroyResource: (resource: T) => Promise<any>
constructor ({ createConnection, destroyConnection, idle, min, max }) {
this._max = max || this._max
this._min = min || this._min
this._idle = idle || this._idle
this._resources = new Queue<{ expiration: number, resource: T}>(max)
this._createResource = () => {
this._size++
return Promise.resolve(createConnection())
}
this._destroyResource = (x) => {
this._size--
return Promise.resolve(destroyConnection(x))
}
this._resourceIds = new Queue(range(this.max).map((_, i) => 1 + i))
if (this.min > 0) {
this._initializing = true
Promise.all(range(this.min).map(this._createResource))
.then(resources => {
this._resources.enqueue(
...resources.map(resource => ({ expiration: Date.now(), resource }))
)
this._pending.toArray().forEach(x => x.resolve(this.resource()))
this._initializing = false
this._pending.clear()
})
}
}
/**
* Current number of resources in pool
*/
public get size () { return this._size }
/**
* Current number of resources in use
*/
public get issued () { return this.max - this._resourceIds.length }
/**
* Maximum size of pool
*/
public get max () { return this._max }
/**
* Minimum size of pool
*/
public get min () { return this._min }
/**
* Approximate lifetime (ms) of an unused resource
*/
public get idle () { return this._idle }
/**
* Forcibly destroy all created resources
* To close the pool gracefully, call #drain first
* A subsequent call to #resource is a programming error
*/
public close () {
this._closing = true
return Promise.all(Object.keys(this._issued)
.map(x => this._issued[x])
.concat(this._resources.toArray().map(x => x.resource))
.map(x => this.destroy(x)))
}
/**
* Wait until all resources have been released
* This will not prevent resources from being issued. (calls to #resource)
* Usually followed by a call to #close.
* Calling close while draining, will short circuit this process
*/
public drain () {
return new Promise ((resolve) => {
!function wait () {
if (this.issued === 0) {
resolve()
} else {
!this._closing && setTimeout(wait, 100) || resolve()
}
}()
})
}
/**
* Destroy a resource. This is useful if you deterime the resource is in an error state.
* It can be called at any time.
* If a destroyed resource is currently issued it will also be released
*/
public destroy (resource) {
if (isDirty(resource)) {
return Promise.resolve()
}
resource[IS_DIRTY] = true
if (poolId(resource)) {
this.release(resource)
}
return this._destroyResource(resource)
}
/**
* Immediately release a resource back into the pool.
* If the resource has not also been destroyed it may be recycled immediately.
* Released resources that remain unused for #idle milliseconds will be destroyed.
*/
public release (resource) {
const resourceId = poolId(resource)
if (!isDirty(resource) && this._pending.length) {
return this._pending.dequeue().resolve(resource)
}
delete this._issued[resourceId]
delete resource[POOL_ID]
this._resourceIds.enqueue(resourceId)
!isDirty(resource) && this._queuePossibleDestruction(resource)
}
/**
* Request a resource, if none exist, request will be queued or one will be created.
* Otherwise, previously released resources are issued.
*/
public resource () {
if (this._closing) {
return Promise.reject(new Error('Cannot issue resource while pool is closing...'))
}
if (!this._resourceIds.length || this._initializing) {
const futureAvailableResource = new Deferred<T>()
this._pending.enqueue(futureAvailableResource)
return futureAvailableResource.promise
}
const resourceId = this._resourceIds.dequeue()
if (this._resources.length) {
const { resource } = this._resources.dequeue()
if (isDirty(resource)) {
this._resourceIds.enqueue(resourceId)
return this.resource()
}
return Promise.resolve(this._dispatchResource(resource, resourceId))
}
return this._createResource().then(resource => {
return this._dispatchResource(resource, resourceId)
})
}
private _dispatchResource (resource: any, resourceId: number) {
resource[POOL_ID] = resourceId
this._issued[resourceId] = resource
return resource
}
private _queuePossibleDestruction (resource :T) {
this._resources.enqueue({ expiration: Date.now() + this._idle, resource })
if (!this._timeoutToken) {
this._scheduleNextCleanup(this._idle)
}
}
private _cleanup () {
if (this.size === this._min || !this._resources.length) {
return this._timeoutToken = null
}
const { expiration } = this._resources.peekFront(),
expiredMsAgo = expiration - Date.now()
if (expiredMsAgo <= 0) {
const { resource } = this._resources.dequeue()
this.destroy(resource)
this._scheduleNextCleanup(100)
} else {
this._scheduleNextCleanup(expiredMsAgo + 1)
}
}
private _scheduleNextCleanup(ms: number) {
if (!this._closing) {
this._timeoutToken = setTimeout(() => {
this._cleanup()
}, ms)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment