Skip to content

Instantly share code, notes, and snippets.

@clshortfuse
Last active August 2, 2022 15:19
Show Gist options
  • Save clshortfuse/33447c2b731531ecf5a7a224f649e818 to your computer and use it in GitHub Desktop.
Save clshortfuse/33447c2b731531ecf5a7a224f649e818 to your computer and use it in GitHub Desktop.
Tedious Connection Pool
import Connection from 'tedious/lib/connection.js';
/** @typedef {import('tedious').ConnectionConfig} ConnectionConfig */
/** @typedef {import('tedious').ConnectionError} ConnectionError */
/** @typedef {import('tedious').RequestError} RequestError */
const MIN_CONNECTION_POOL = 2;
const MAX_CONNECTION_POOL = 10;
const MAX_CONNECTION_IDLE = 5000;
const MAX_CONNECTION_RETRY_TIME = 30000;
/** @return {[number, number]} */
function getCurrentTime() {
return process.hrtime();
}
class ConnectionPoolItem {
/** @param {Connection} connection */
constructor(connection) {
this.connection = connection;
this.locked = false;
this.connected = false;
this.updateIdleTime();
}
/**
* @param {boolean} val
* @return {void}
*/
setConnected(val) {
this.connected = val;
}
/**
* @param {boolean} val
* @return {void}
*/
setLocked(val) {
this.locked = val;
}
updateIdleTime() {
this.lastIdleTime = getCurrentTime();
}
exceedsIdleTime() {
const timeSince = process.hrtime(this.lastIdleTime);
return (timeSince[0] > Math.trunc(MAX_CONNECTION_IDLE / 1000)
|| timeSince[1] > (MAX_CONNECTION_IDLE % 1000) * 1e9);
}
}
export default class TediousPool {
/** @param {ConnectionConfig} connectionConfig */
constructor(connectionConfig) {
this.connectionConfig = connectionConfig;
/** @type {ConnectionPoolItem[]} */
this.connectionPool = [];
/** @type {{resolve:function(ConnectionPoolItem):any, reject:function(Error):any}[]} */
this.pendingConnectionPromises = [];
this.idleCheckInterval = null;
}
/**
* @param {ConnectionPoolItem} connectionPoolItem
* @return {void}
*/
releaseConnectionPoolItem(connectionPoolItem) {
connectionPoolItem.updateIdleTime();
connectionPoolItem.setLocked(false);
// Handover connection to next in queue
const p = this.pendingConnectionPromises.shift();
p?.resolve(connectionPoolItem);
}
/** @return {Connection} */
buildConnection() {
const connection = new Connection(this.connectionConfig);
connection.addListener('end', () => {
connection.removeAllListeners('error');
const index = this.connectionPool.findIndex((i) => i.connection === connection);
if (index !== -1) {
this.connectionPool.splice(index, 1);
this.pruneConnections();
}
});
connection.addListener('connect', (err) => {
const item = this.connectionPool.find((i) => i.connection === connection);
if (!item) {
if (!err) {
connection.close();
}
// Orphaned connection?
return;
}
item.setConnected(true);
item.updateIdleTime();
const p = this.pendingConnectionPromises.shift();
if (!p) {
// Nothing in queue?
return;
}
if (err) {
p.reject(err);
return;
}
p.resolve(item);
});
connection.addListener('error', () => {
// console.error('errorListener callback', err);
});
connection.connect();
return connection;
}
/** @return {Promise<ConnectionPoolItem>} */
getConnectionPoolItem() {
return new Promise((resolve, reject) => {
/**
* @param {ConnectionPoolItem} item
* @return {void}
*/
const resolveWithLock = (item) => {
item.setLocked(true);
resolve(item);
};
const idleConnection = this.connectionPool.find((i) => i.connected && !i.locked);
if (idleConnection) {
// Found idle connection
resolveWithLock(idleConnection);
return;
}
if (this.connectionPool.length < MAX_CONNECTION_POOL) {
// Build new connection and add to pool
const connection = this.buildConnection();
this.connectionPool.push(new ConnectionPoolItem(connection));
}
// Add to queue
this.pendingConnectionPromises.push({
resolve: resolveWithLock,
reject,
});
});
}
/**
* @template T
* @param {function(Connection):Promise<T>} fn
* @return {Promise<T>}
*/
getConnection(fn) {
return this.getConnectionPoolItem()
.catch((getError) => {
// Get connection failed (likely connection issues)
throw getError;
}).then((i) => {
i.updateIdleTime();
// Wrap in Promise in case fn is not PromiseLike.
return Promise.resolve().then(() => fn(i.connection)).catch((functionError) => {
// Function failed. Release and throw error.
this.releaseConnectionPoolItem(i);
throw functionError;
}).then((result) => {
// Function completed. Release and return result.
this.releaseConnectionPoolItem(i);
return result;
});
});
}
/** @return {void} */
pruneConnections() {
// Sort by locked DESC, queryTime ASC
// Keep minimum pool count
// Close idle connections that have connected at least once
/**
* @param {ConnectionPoolItem} a
* @param {ConnectionPoolItem} b
* @return {number}
*/
function sortConnectionPoolItems(a, b) {
return (b.locked ? 1 : 0) - (a.locked ? 1 : 0)
|| (a.lastIdleTime[0] - b.lastIdleTime[0])
|| (a.lastIdleTime[1] - b.lastIdleTime[1]);
}
this.connectionPool
.slice()
.sort(sortConnectionPoolItems)
.slice(MIN_CONNECTION_POOL)
.filter((c) => c.connected && !c.locked && c.exceedsIdleTime())
.forEach((c) => c.connection.close());
}
/** @return {Promise} */
start() {
let retryTime = 0;
let retries = -1;
/** @return {Promise} */
const run = () => this.getConnectionPoolItem().catch(() => {
if (retryTime < MAX_CONNECTION_RETRY_TIME) {
if (retries === -1) {
retryTime = 0; // Retry immediately
} else {
retryTime = (2 ** retries) * 100; // Exponential backoff
}
}
if (retryTime > MAX_CONNECTION_RETRY_TIME) {
retryTime = MAX_CONNECTION_RETRY_TIME;
}
retries += 1;
return new Promise((resolve) => setTimeout(resolve, retryTime)).then(run);
});
return run().then((connection) => {
this.releaseConnectionPoolItem(connection);
this.idleCheckInterval = setInterval(() => {
this.pruneConnections();
}, MAX_CONNECTION_IDLE);
});
}
/** @return {Promise} */
stop() {
clearInterval(this.idleCheckInterval);
return Promise.all(this.connectionPool.map((i) => i.connection.close));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment