Skip to content

Instantly share code, notes, and snippets.

@craigfurman
Created March 31, 2023 09:59
Show Gist options
  • Save craigfurman/20ca2e8b6440e1591f64ef94f0b1ee90 to your computer and use it in GitHub Desktop.
Save craigfurman/20ca2e8b6440e1591f64ef94f0b1ee90 to your computer and use it in GitHub Desktop.
Example of using postgres+pgx to implement a distributed mutex in Go
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/sync/errgroup"
)
const (
workerCount = 10
lockID = 42
)
var (
connPool *pgxpool.Pool
logger *log.Logger
)
func main() {
ctx := context.Background()
logger = log.New(os.Stdout, "", log.LstdFlags)
var err error
connPool, err = pgxpool.New(ctx, os.Getenv("POSTGRES_DSN"))
must(err)
grp, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
i := i // infamous loop-closure workaround
grp.Go(func() error {
return worker(ctx, i)
})
}
must(grp.Wait())
}
func worker(ctx context.Context, id int) error {
logger.Printf("worker %d: starting\n", id)
txn, err := connPool.Begin(ctx)
must(err)
const lockStatement = `SELECT pg_try_advisory_xact_lock($1)`
jitter := time.Millisecond * time.Duration(rand.Intn(1000)-500)
tick := time.NewTicker((time.Second * 4) + jitter)
defer tick.Stop()
for {
select {
case <-tick.C:
row := txn.QueryRow(ctx, lockStatement, lockID)
var hasLock bool
if err := row.Scan(&hasLock); err != nil {
return err
}
if hasLock {
logger.Printf("worker %d: I have the lock\n", id)
} else {
logger.Printf("worker %d: I do not have the lock\n", id)
}
randomShouldYield := rand.Float64() > 0.7
if hasLock && randomShouldYield {
log.Printf("worker %d: releasing lock and restarting", id)
if err := txn.Commit(ctx); err != nil {
return err
}
return worker(ctx, id)
}
case <-ctx.Done():
return fmt.Errorf("stopping worker %d: %w", id, ctx.Err())
}
}
}
func must(err error) {
if err != nil {
panic(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment