Skip to content

Instantly share code, notes, and snippets.

@mikberg
Created December 22, 2022 12:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikberg/cef190ba88716a6b0c171d0f4b745c93 to your computer and use it in GitHub Desktop.
Save mikberg/cef190ba88716a6b0c171d0f4b745c93 to your computer and use it in GitHub Desktop.
Promscale metric data retention job
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/alecthomas/kong"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
var MaxWorkers = 4
type Work interface {
Do(ctx context.Context, db *pgxpool.Pool) error
}
type MetricRetentionWork struct {
ID int
TableSchema string
MetricName string
}
func (w MetricRetentionWork) Do(ctx context.Context, db *pgxpool.Pool) error {
start := time.Now()
log := log.With().Str("metric_name", w.MetricName).Logger()
log.Debug().Msg("handling metric")
var locked bool
if err := db.QueryRow(ctx, "SELECT _prom_catalog.lock_metric_for_maintenance($1, wait=>false)", w.ID).Scan(&locked); err != nil {
return fmt.Errorf("failed to lock metric for maintenance: %w", err)
}
if !locked {
log.Warn().Msgf("skipping metric, unable to lock")
return nil
}
log.Debug().Msg("metric locked for maintenance")
if _, err := db.Exec(ctx, "CALL _prom_catalog.drop_metric_chunks($1, $2, NOW() - _prom_catalog.get_metric_retention_period($1, $2), log_verbose=>true)", w.TableSchema, w.MetricName); err != nil {
return fmt.Errorf("failed to drop metric chunks: %w", err)
}
log.Debug().Msg("called drop_metric_chunks")
if _, err := db.Exec(ctx, "SELECT _prom_catalog.unlock_metric_for_maintenance($1)", w.ID); err != nil {
return fmt.Errorf("failed to unlock metric: %w", err)
}
log.Debug().Msg("unlocked metric")
// not sure if this helps
if _, err := db.Exec(ctx, "COMMIT"); err != nil {
return fmt.Errorf("failed to commit: %w", err)
}
log.Info().Dur("duration", time.Since(start)).Msg("done handling metric retention work")
return nil
}
type Globals struct {
LogLevel string `type:"string" name:"log-level" default:"info" env:"LOG_LEVEL"`
}
type App struct {
Globals
Maintain MaintainCmd `cmd:"" default:""`
}
type MaintainCmd struct {
DatabaseURL string `type:"string" name:"database-url" required:"" env:"DATABASE_URL"`
MaxWorkers int32 `name:"max-workers" env:"MAX_WORKERS" default:"3"`
ConnLifetime time.Duration `name:"conn-lifetime" env:"CONN_LIFETIME" default:"3m" help:"How long to let each db connection run before recycling it"`
db *pgxpool.Pool
}
func (c *MaintainCmd) Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create database connection config
connCfg, err := pgxpool.ParseConfig(c.DatabaseURL)
if err != nil {
return fmt.Errorf("failed to parse database url: %w", err)
}
connCfg.MaxConnLifetime = c.ConnLifetime
connCfg.MaxConns = c.MaxWorkers
// connect to database
db, err := pgxpool.ConnectConfig(ctx, connCfg)
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
c.db = db
defer c.db.Close()
// start producing work
errcs := []<-chan error{}
workc, errc := c.produceWork(ctx)
errcs = append(errcs, errc)
for idx := 0; idx < int(c.MaxWorkers); idx++ {
errc := c.startWorker(ctx, workc)
errcs = append(errcs, errc)
}
donec := c.wait(errcs)
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case s := <-sigc:
log.Info().Msgf("signal received [%v] cancelling everything ...", s)
cancel()
case <-donec:
log.Info().Msg("done!")
}
return nil
}
func (c *MaintainCmd) wait(errcs []<-chan error) <-chan struct{} {
out := make(chan struct{})
var wg sync.WaitGroup
waiter := func(errc <-chan error) {
defer wg.Done()
for err := range errc {
if err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("error occured")
}
}
}
wg.Add(len(errcs))
for _, errc := range errcs {
go waiter(errc)
}
go func() {
defer close(out)
wg.Wait()
}()
return out
}
func (c *MaintainCmd) startWorker(ctx context.Context, workc <-chan Work) <-chan error {
errc := make(chan error, 1)
go func() {
defer close(errc)
for work := range workc {
log.Debug().Msgf("worker received work: %+v", work)
if err := work.Do(ctx, c.db); err != nil {
errc <- err
continue
}
}
log.Debug().Msgf("worker is done")
}()
return errc
}
func (c *MaintainCmd) produceWork(ctx context.Context) (<-chan Work, <-chan error) {
workc := make(chan Work)
errc := make(chan error, 1)
buflen := c.MaxWorkers * 3
rollingLatencies := make([]time.Duration, buflen, buflen)
var kalm time.Duration
getAvgLatency := func() time.Duration {
sum := time.Duration(0)
for _, l := range rollingLatencies {
sum += l
}
return time.Duration(float64(sum) / float64(len(rollingLatencies)) * float64(c.MaxWorkers))
}
go func() {
defer close(workc)
defer close(errc)
stopWhenDone := false
for {
log.Info().Msg("looking for more work")
work, err := c.createMetricRetentionWork(ctx, 100)
if err != nil {
errc <- err
return
}
log.Info().Msgf("got %d units of fresh work", len(work))
if len(work) < 50 {
stopWhenDone = true
}
for idx, work := range work {
start := time.Now()
select {
case workc <- work:
log.Debug().Msgf("sent work")
rollingLatencies[idx%len(rollingLatencies)] = time.Since(start)
case <-ctx.Done():
// produce no more work; workers will continue until their next cycle
return
}
// every once in a while, check if we should increase calmness
if idx%int(c.MaxWorkers) == 0 {
avgLatency := getAvgLatency()
// if latency is high, enhance our calm
if avgLatency > 10*time.Second {
log.Warn().Dur("average_latency", avgLatency).Msg("average latency is high, calming down ...")
kalm = avgLatency
} else {
if kalm > 0 {
log.Info().Dur("average_latency", avgLatency).Msg("speeding up again")
}
kalm = 0
}
}
time.Sleep(kalm)
}
if stopWhenDone {
log.Info().Msg("stopping work production")
return
}
}
}()
return workc, errc
}
func (c *MaintainCmd) createMetricRetentionWork(ctx context.Context, limit int) ([]Work, error) {
work := []Work{}
rows, err := c.db.Query(ctx, "select id, table_schema, metric_name from _prom_catalog.get_metrics_that_need_drop_chunk() limit $1", limit)
if err != nil {
return nil, fmt.Errorf("faied to query for metrics that need drop chunk: %w", err)
}
defer rows.Close()
var w MetricRetentionWork
for rows.Next() {
if err := rows.Scan(&w.ID, &w.TableSchema, &w.MetricName); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
work = append(work, w)
}
// shuffle the work
rand.Shuffle(len(work), func(i, j int) { work[i], work[j] = work[j], work[i] })
return work, nil
}
func main() {
app := App{
Globals: Globals{},
}
cli := kong.Parse(
&app,
kong.Name("promscale-maintainer"),
kong.UsageOnError(),
)
zerolog.DurationFieldUnit = time.Second
if lvl, err := zerolog.ParseLevel(app.LogLevel); err != nil {
cli.FatalIfErrorf(err)
} else {
zerolog.SetGlobalLevel(lvl)
}
cli.FatalIfErrorf(cli.Run(app.Globals))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment