Skip to content

Instantly share code, notes, and snippets.

@omerkaya1
Last active May 6, 2024 12:25
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save omerkaya1/fc272877536c4ed4af9f36e2ae57bf08 to your computer and use it in GitHub Desktop.
Save omerkaya1/fc272877536c4ed4af9f36e2ae57bf08 to your computer and use it in GitHub Desktop.
Golang PostgreSQL connection pool example Go ^1.18
package db
import (
"context"
"database/sql"
"net"
"net/url"
"strconv"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type (
Conn interface {
Release()
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
ConnPool interface {
Acquire(ctx context.Context) (Conn, error)
}
PGX struct {
pool *pgxpool.Pool
}
Config struct {
ConnRetry int
MaxOpenConns int
MinOpenConns int
DescriptionCacheCapacity int
StatementCacheCapacity int
ConnTimeout time.Duration
MaxOpenConnTTL time.Duration
MaxIdleConnTTL time.Duration
MaxConnLifetimeJitterTTL time.Duration
QueryExecMode string
User string
Password string
Host string
Port string
Name string
}
)
func (cfg Config) GetDSN() string { // nolint:gocritic
query := make(url.Values)
if cfg.MaxOpenConns > 0 {
query.Set("pool_max_conns", strconv.Itoa(cfg.MaxOpenConns))
}
if cfg.MinOpenConns > 0 {
query.Set("pool_min_conns", strconv.Itoa(cfg.MinOpenConns))
}
if cfg.MaxOpenConnTTL > 0 {
query.Set("pool_max_conn_lifetime", cfg.MaxOpenConnTTL.String())
}
if cfg.MaxIdleConnTTL > 0 {
query.Set("pool_max_conn_idle_time", cfg.MaxIdleConnTTL.String())
}
if cfg.MaxIdleConnTTL > 0 {
query.Set("pool_max_conn_idle_time", cfg.MaxIdleConnTTL.String())
}
if cfg.MaxConnLifetimeJitterTTL > 0 {
query.Set("pool_max_conn_lifetime_jitter", cfg.MaxConnLifetimeJitterTTL.String())
}
if cfg.StatementCacheCapacity >= 0 {
query.Set("statement_cache_capacity", strconv.Itoa(cfg.StatementCacheCapacity))
}
if cfg.DescriptionCacheCapacity >= 0 {
query.Set("description_cache_capacity", strconv.Itoa(cfg.DescriptionCacheCapacity))
}
if cfg.QueryExecMode != "" {
query.Set("default_query_exec_mode", cfg.QueryExecMode)
}
dsn := url.URL{
Scheme: "postgres",
User: url.UserPassword(cfg.User, cfg.Password),
Host: net.JoinHostPort(cfg.Host, cfg.Port),
Path: cfg.Name,
RawQuery: query.Encode(),
}
return dsn.String()
}
func NewPGX(ctx context.Context, cfg Config) (*PGX, error) { //nolint:gocritic
pool, err := pgxpool.New(ctx, cfg.GetDSN())
if err != nil {
return nil, errors.Wrap(err, "create db conn pool")
}
if err = PingConnection(ctx, &cfg, func(pingCtx context.Context) error {
return pool.Ping(pingCtx)
}); err != nil {
return nil, errors.Wrap(err, "create db conn pool")
}
return &PGX{pool: pool}, nil
}
func NewSQL(ctx context.Context, cfg Config) (*sql.DB, error) { //nolint:gocritic
poolCfg, err := pgx.ParseConfig(cfg.GetDSN())
if err != nil {
return nil, errors.Wrap(err, "create db conn pool")
}
pool := stdlib.OpenDB(*poolCfg)
if err = PingConnection(ctx, &cfg, func(pingCtx context.Context) error {
return pool.PingContext(pingCtx)
}); err != nil {
return nil, errors.Wrap(err, "create db conn pool")
}
return pool, nil
}
func PingConnection(ctx context.Context, cfg *Config, pinger func(ctx context.Context) error) error {
ticker := time.NewTicker(cfg.ConnTimeout)
defer ticker.Stop()
var err error
for i := 0; i < cfg.ConnRetry; i++ {
switch err = pinger(ctx); err {
case nil:
return nil
case context.Canceled, context.DeadlineExceeded:
return errors.Wrap(err, "ping database connection")
default:
zap.L().Error("failed to ping database connection", zap.Error(err))
}
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "ping database connection")
case <-ticker.C:
}
}
return errors.Wrap(err, "ping database connection")
}
func (d *PGX) Acquire(ctx context.Context) (Conn, error) {
return d.pool.Acquire(ctx)
}
func (d *PGX) Close() error {
d.pool.Close()
return nil
}
func (d *PGX) CollectMetrics(ctx context.Context) error {
done := make(chan struct{})
go func() {
s := d.pool.Stat()
totalConns.Set(float64(s.TotalConns()))
acquireCount.Set(float64(s.AcquireCount()))
acquiredCount.Set(float64(s.AcquiredConns()))
canceledAcquireCount.Set(float64(s.CanceledAcquireCount()))
constructingConns.Set(float64(s.ConstructingConns()))
emptyAcquireCount.Set(float64(s.EmptyAcquireCount()))
idleConns.Set(float64(s.IdleConns()))
maxConns.Set(float64(s.MaxConns()))
newConnsCount.Set(float64(s.NewConnsCount()))
maxLifetimeDestroyCount.Set(float64(s.MaxLifetimeDestroyCount()))
maxIdleDestroyCount.Set(float64(s.MaxIdleDestroyCount()))
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment