Skip to content

Instantly share code, notes, and snippets.

@pkit
Created May 10, 2020 15:40
Show Gist options
  • Save pkit/58887bb0c5c13c8edacc5fd0cb05c53b to your computer and use it in GitHub Desktop.
Save pkit/58887bb0c5c13c8edacc5fd0cb05c53b to your computer and use it in GitHub Desktop.
const Pool = require('pg-pool');
const STATE = {
CANCELLED: 0,
ACTIVE: 1,
WAITING: 2,
}
export class ManagedPool extends Pool {
constructor (options) {
super(options)
const adminOpts = Object.assign({}, options, {
poolSize: options.adminPoolSize || 1,
idleTimeoutMillis: 0, // make admin connection(s) persistent
})
this._adminPool = new Pool(adminOpts)
}
async cancel(client, wait = 1000, prevState) {
const pid = client.processID
let adminClient = await this._adminPool.connect();
try {
const { rows } = await adminClient.query(`select state from pg_stat_activity where pid = $1`, [pid])
const state = parseState(rows)
if (state === STATE.CANCELLED) {
// cancelled already, bail out
return
}
// try to re-cancel later, in case it hangs
setTimeout(async () => {
await this.cancel(client, wait, prevState)
}, wait)
// kill if we are here again
if (prevState === state) {
return await adminClient.query(`select pg_terminate_backend($1)`, [pid])
}
switch (state) {
case STATE.WAITING:
// transaction is idle, rollback
try {
await client.query('rollback')
} finally {
// rollback finished, we can transition client to idle again
// unfortunately we cannot signal anything to the original consumer
client.release()
}
break
case STATE.ACTIVE:
// transaction is active, try to cancel
await adminClient.query(`select pg_cancel_backend($1)`, [pid])
break
}
} finally {
adminClient.release()
}
}
}
function parseState (rows) {
let state = STATE.CANCELLED;
if (rows.length > 0) {
switch (rows[0].state) {
case 'active':
case 'fastpath':
state = STATE.ACTIVE;
break;
case 'idle in transaction':
case 'idle in transaction (aborted)':
state = STATE.WAITING;
break;
case 'idle':
state = STATE.CANCELLED;
break;
default:
throw new Error(`Unknown state returned: ${rows[0].state}`)
}
}
return state;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment