Skip to content

Instantly share code, notes, and snippets.

@kevinburkeshyp
Last active August 29, 2015 14:26
Show Gist options
  • Save kevinburkeshyp/edadef3bd85b6dd0dddf to your computer and use it in GitHub Desktop.
Save kevinburkeshyp/edadef3bd85b6dd0dddf to your computer and use it in GitHub Desktop.
# Database connection interface modeled heavily on Go's database/sql library.
#
# If you need to make a query, call:
#
# DBConnection.get (err, conn) ->
# conn.query 'SELECT foo FROM bar', (err, result) ->
# console.log result.rows[0]
# conn.release()
#
# You'll need to release the connection yourself. It's not recommended, but
# it's safe to call `conn.release()` twice.
#
# If you need a database transaction, call `DBConnection.begin`:
#
# DBConnection.begin (err, txn) ->
# txn.query 'UPDATE ... ', (err, result) ->
# txn.rollback() # or txn.commit()
#
# The DBTransaction object that's returned has the same interface, but instead
# of releasing the connection, call `rollback` or `commit`, which will release
# the connection for you.
#
# You should NOT continue to use the connection after calling `release`,
# `commit`, or `rollback`.
#
# This library is callback-based because the underlying library relies heavily
# on it. If you need a Promise based interface, wrap the calling functions in
# Promise.promisify calls.
pg = require 'pg'
DBTransaction = require './DBTransaction'
Metrics = require '../services/Metrics'
DEFAULT_CONNECTION_POOL_SIZE = 100
module.exports = class DBConnection
# Create a new DBConnection instance. Outside code shouldn't create
# DBConnection instances directly - use `DBConnection.get` or
# `DBConnection.begin` instead.
#
# `client`: a pg.Client instance
# `releaseFn`: function to release this connection back to the `pg` pool.
constructor: (client, releaseFn) ->
@client = client
@releaseFn = releaseFn
@_released = false
# Get a new database connection. Will hit the callback with (err,
# DBConnection). Extensive documentation for the parameters can be found
# here: https://github.com/brianc/node-postgres/wiki/pg#method-connect
#
# The caller is responsible for releasing the connection by calling
# release(true) when they are finished with the query. Otherwise it will stay
# open for up to 30 seconds.
#
# NB: This will block if all connections in the pool are checked out! If
# latency is important, consider setting a timeout and canceling the query.
# More discussion here: https://github.com/brianc/node-postgres/issues/805
@get: (cb) ->
DBConnection._setPoolSize()
start = Date.now()
pg.connect sails.config.connections.postgresql, (err, client, release) ->
try
poolUtilization = DBConnection._getPoolUtilization()
Metrics.measure 'db.txn_conn_pool.count', poolUtilization
Metrics.measure 'db.txn_conn_pool.total', pg.defaults.poolSize
Metrics.increment 'db.txn_conn_pool.get'
Metrics.timing 'db.txn_conn_pool.get.latency', start
cb err, new DBConnection(client, release)
# Begin gets a new connection and begins a transaction. Hits the callback
# with an error or a DBTransaction object.
#
# The caller is responsible for calling commit() or rollback() to complete
# the transaction in every code path; open transactions crush database
# performance. If Postgres returns a syntax error, the client loses the
# connection, or there's a connection timeout, the connection should
# automatically be released.
@begin: (cb) ->
DBConnection.get (err, conn) ->
conn.query 'BEGIN', (err) ->
if err
Metrics.increment 'db.txn.begin.error'
# Connection was released already in conn.query
return cb err
Metrics.increment 'db.txn.begin.success'
cb null, new DBTransaction(conn)
# Query makes the given query to the database. If the query fails with a
# connection error or Postgres error, this will release the connection before
# hitting the callback with an error.
#
# Query accepts two different signatures:
#
# (sql, cb): A SQL command to execute and a callback to hit
# (sql, values, cb): A SQL command to execute, an array of values to
# interpolate into the query (parameters), and a callback to hit.
query: (args..., cb) ->
@client.query args..., (err, res) =>
if err
# We're not at a scale yet where we need to think about keeping
# connections alive & the correctness of doing so. Just release every
# time there's an error.
@release(true)
return cb err
cb null, res
# Release this connection back to the pool. `dispose=true` will destroy
# the underlying connection object - it's the safest mode, in case PG is
# still trying to send back data, but may result in unnecessary connection
# teardowns/latency.
release: (dispose) ->
# I'd feel better with a lock around this, but since an object should
# exist only in the scope of one process, and this code is all
# synchronous, it shouldn't be possible to read @released = false in two
# threads, and release twice.
if @_released
return
@releaseFn(dispose)
@_released = true
poolUtilization = DBConnection._getPoolUtilization()
Metrics.measure 'db.txn_conn_pool.release.count', poolUtilization
@_setPoolSize: ->
pg.defaults.poolSize = DBConnection._getPoolSize()
# 06/17/2015: Each dyno allows 100 connections between Node and PGBouncer,
# and 20 connections between PGBouncer and the server. We want to set the
# pool size to a value between these two. Hopefully we don't ever hit this
# limit.
@_getPoolSize: ->
defaultPoolSize = parseInt(process.env.PGBOUNCER_MAX_CLIENT_CONN, 10)
if defaultPoolSize > 0
return defaultPoolSize
else
return DEFAULT_CONNECTION_POOL_SIZE
# getPoolUtilization synchronously returns the current number of open
# connections as an integer, or throws an error if that number can't be
# determined.
@_getPoolUtilization: ->
# node-postgres creates one pools for each unique JSON.stringify(the DB
# config object). we only ever connect with one configuration object, so
# there should only be one pool
key = Object.keys(pg.pools.all)[0]
if key?
pool = pg.pools.all[key]
# this should always succeed, but let's be safe
if pool?
return pool.getPoolSize()
throw new Error("DBConnection: Couldn't get pool size. Make sure at least one connection's been made")
# Transaction interface heavily based on Go's database/sql library
#
# Get a transaction
#
# DBConnection.begin (err, txn) ->
# ... Work with the transaction object..
#
# The DBTransaction object has three methods:
#
# - query, which has the same interface as client.query in the node-postgres library
# - commit, which commits the transaction
# - rollback, which aborts the transaction
#
# Example usage:
#
# DBConnection.begin (err, txn) ->
# txn.query 'UPDATE foo WHERE bar='baz', (err, result) ->
# txn.query 'UPDATE bankaccounts WHERE bar='baz', (err, result) ->
# if result.rows is 0
# txn.rollback cb
# else
# txn.commit cb
#
# Open transactions are extremely harmful to performance, and should be
# avoided. The caller should ensure all code paths are calling commit() or
# rollback(). Better yet, just use normal database queries for situations where
# it's not critical that two records are updated in sync.
#
# A transaction will also be aborted in the event of a Postgres syntax error
# or a connection error.
Metrics = require '../services/Metrics'
module.exports = class DBTransaction
constructor: (conn) ->
@conn = conn
# Rollback the given transaction and release the connection.
#
# Hits the callback with an error if Postgres reports an error, or the
# connection fails. The connection will also be released in that case.
rollback: (cb) ->
@conn.query 'ROLLBACK', (err) =>
if err
Metrics.increment 'db.txn.rollback.error'
# Connection was released already
if cb
cb err
return
Metrics.increment 'db.txn.rollback.success'
@conn.release(true)
if cb
cb null
# Query the database with the given args and callback.
#
# Accepts the same arguments as `conn.query` in the node-postgres library and
# DBConnection.query in this library.
query: (args..., cb) ->
@conn.query args..., cb
# Commit the given transaction and release the connection.
#
# Hits the callback with an error if Postgres reports an error, or the
# connection fails. The connection will also be released in that case.
commit: (cb) ->
@conn.query 'COMMIT', (err) =>
if err
# Connection was released already
Metrics.increment 'db.txn.commit.error'
if cb
cb err
return
Metrics.increment 'db.txn.commit.success'
@conn.release(true)
if cb
cb null
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment