Skip to content

Instantly share code, notes, and snippets.

@mvrhov
Created July 10, 2018 18:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mvrhov/08870b23efdad46791c30fb49bb8ef66 to your computer and use it in GitHub Desktop.
Save mvrhov/08870b23efdad46791c30fb49bb8ef66 to your computer and use it in GitHub Desktop.
type ConnPool struct {
pgx.ConnPool
}
type AfterAcquireFunc func(c *pgx.Conn) error
// NewConnPool creates a new ConnPool. config.ConnConfig is passed through to
// Connect directly.
func NewConnPool(config pgx.ConnPoolConfig) (p *ConnPool, err error) {
pp, err := pgx.NewConnPool(config)
return &ConnPool{ConnPool: *pp}, err
}
// AcquireEx takes exclusive use of a connection until it is released.
//If fn is provided, then it also executes the said function
func (p *ConnPool) AcquireEx(fn AfterAcquireFunc) (*pgx.Conn, error) {
c, err := p.ConnPool.Acquire()
if err == nil && fn != nil {
err = fn(c)
}
return c, err
}
// Exec acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnPool) Exec(fn AfterAcquireFunc, sql string, arguments ...interface{}) (commandTag pgx.CommandTag, err error) {
var c *pgx.Conn
if c, err = p.AcquireEx(fn); err != nil {
return
}
defer p.Release(c)
return c.Exec(sql, arguments...)
}
func (p *ConnPool) ExecEx(ctx context.Context, fn AfterAcquireFunc, sql string, options *pgx.QueryExOptions, arguments ...interface{}) (commandTag pgx.CommandTag, err error) {
var c *pgx.Conn
if c, err = p.AcquireEx(fn); err != nil {
return
}
defer p.Release(c)
return c.ExecEx(ctx, sql, options, arguments...)
}
// Query acquires a connection and delegates the call to that connection. When
// *Rows are closed, the connection is released automatically.
func (p *ConnPool) Query(fn AfterAcquireFunc, sql string, args ...interface{}) (*pgx.Rows, error) {
c, err := p.AcquireEx(fn)
if err != nil {
// Because checking for errors can be deferred to the *Rows, build one with the error
return &pgx.Rows{ /*closed: true, err: err*/ }, err
}
rows, err := c.Query(sql, args...)
if err != nil {
p.Release(c)
return rows, err
}
//rows.connPool = p
return rows, nil
}
func (p *ConnPool) QueryEx(ctx context.Context, fn AfterAcquireFunc, sql string, options *pgx.QueryExOptions, args ...interface{}) (*pgx.Rows, error) {
c, err := p.AcquireEx(fn)
if err != nil {
// Because checking for errors can be deferred to the *Rows, build one with the error
return &pgx.Rows{ /*closed: true, err: err*/ }, err
}
rows, err := c.QueryEx(ctx, sql, options, args...)
if err != nil {
p.Release(c)
return rows, err
}
//rows.connPool = p
return rows, nil
}
// QueryRow acquires a connection and delegates the call to that connection. The
// connection is released automatically after Scan is called on the returned
// *Row.
func (p *ConnPool) QueryRow(fn AfterAcquireFunc, sql string, args ...interface{}) *pgx.Row {
rows, _ := p.Query(fn, sql, args...)
return (*pgx.Row)(rows)
}
func (p *ConnPool) QueryRowEx(ctx context.Context, fn AfterAcquireFunc, sql string, options *pgx.QueryExOptions, args ...interface{}) *pgx.Row {
rows, _ := p.QueryEx(ctx, fn, sql, options, args...)
return (*pgx.Row)(rows)
}
// Begin acquires a connection and begins a transaction on it. When the
// transaction is closed the connection will be automatically released.
func (p *ConnPool) Begin(fn AfterAcquireFunc) (*pgx.Tx, error) {
return p.BeginEx(context.Background(), fn, nil)
}
// BeginEx acquires a connection and starts a transaction with txOptions
// determining the transaction mode. When the transaction is closed the
// connection will be automatically released.
func (p *ConnPool) BeginEx(ctx context.Context, fn AfterAcquireFunc, txOptions *pgx.TxOptions) (*pgx.Tx, error) {
for {
c, err := p.AcquireEx(fn)
if err != nil {
return nil, err
}
tx, err := c.BeginEx(ctx, txOptions)
if err != nil {
alive := c.IsAlive()
p.Release(c)
// If connection is still alive then the error is not something trying
// again on a new connection would fix, so just return the error. But
// if the connection is dead try to acquire a new connection and try
// again.
if alive || ctx.Err() != nil {
return nil, err
}
continue
}
//tx.connPool = p
return tx, nil
}
}
// CopyFrom acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnPool) CopyFrom(fn AfterAcquireFunc, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int, error) {
c, err := p.AcquireEx(fn)
if err != nil {
return 0, err
}
defer p.Release(c)
return c.CopyFrom(tableName, columnNames, rowSrc)
}
// BeginBatch acquires a connection and begins a batch on that connection. When
// *Batch is finished, the connection is released automatically.
func (p *ConnPool) BeginBatch(fn AfterAcquireFunc) *pgx.Batch {
//c, err := p.AcquireEx(fn)
return &pgx.Batch{ /*conn: c, connPool: p, err: err*/ }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment