Skip to content

Instantly share code, notes, and snippets.

@jiacai2050
Created December 23, 2019 15:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jiacai2050/046b9281c70c7d507bef904f9533665f to your computer and use it in GitHub Desktop.
Save jiacai2050/046b9281c70c7d507bef904f9533665f to your computer and use it in GitHub Desktop.
// bulk_load_prometheus fills prometheus tsdb with data (opentsdb-style) from stdin.
//
// The caller is responsible for assuring that the database is empty before
// bulk load.
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
)
var (
dbPath string
input string
numWrite int
walCompress bool
)
func init() {
log.SetFlags(log.Lshortfile | log.LstdFlags)
flag.StringVar(&dbPath, "dbPath", "/tmp", "prometheus data path")
flag.StringVar(&input, "input", "/tmp/input", "ingest data")
flag.IntVar(&numWrite, "numWrite", runtime.NumCPU()+1, "write concurrence")
flag.BoolVar(&walCompress, "walCompress", false, "wal compression")
go func() {
if err := http.ListenAndServe("0.0.0.0:8000", nil); err != nil {
fmt.Fprintf(os.Stderr, "Start profile server failed: %v", err)
}
}()
}
type testStorage struct {
s storage.Storage
dir string
wg *sync.WaitGroup
acc uint64
}
type logFunc func(kvs ...interface{})
func (f logFunc) Log(kvs ...interface{}) error {
f(kvs)
return nil
}
func initStorage(path string) *testStorage {
err := os.Mkdir(path, os.ModePerm)
if err != nil {
panic(err)
}
l := func(kvs ...interface{}) {
log.Printf("kvs = %+v", kvs)
}
db, err := tsdb.Open(path, logFunc(l), prometheus.DefaultRegisterer, &tsdb.Options{
MinBlockDuration: model.Duration(1 * time.Second),
MaxBlockDuration: model.Duration(10 * time.Second),
RetentionDuration: model.Duration(365 * 24 * time.Hour),
WALSegmentSize: 0,
WALCompression: walCompress,
})
if err != nil {
panic(err)
}
return &testStorage{s: tsdb.Adapter(db, int64(0)), dir: path}
}
func (s *testStorage) Close() error {
s.wg.Wait()
log.Println("close db...")
return s.s.Close()
// if err := s.s.Close(); err != nil {
// return err
// }
// return os.RemoveAll(s.dir)
}
type point struct {
Metric string
Timestamp int64
Tags map[string]string
Value float64
}
func (p point) translate() (labels.Labels, int64, float64) {
ls := make(labels.Labels, 0, len(p.Tags)+1)
for k, v := range p.Tags {
ls = append(ls, labels.Label{
Name: k,
Value: v,
})
}
ls = append(ls, labels.Label{
Name: labels.MetricName,
Value: strings.ReplaceAll(p.Metric, ".", "_"),
})
return ls, p.Timestamp, p.Value
}
func (s *testStorage) Ingest(c chan<- point) {
defer close(c)
f, err := os.Open(input)
if err != nil {
panic(err)
}
buf := bufio.NewReader(f)
for {
bs, err := buf.ReadBytes('\n')
if err != nil {
if err == io.EOF {
log.Println("ingest done")
break
} else {
log.Println(errors.Wrap(err, "read line failed"))
continue
}
}
// log.Printf("read %s\n", bs)
var p point
err = json.Unmarshal(bs, &p)
if err != nil {
log.Println(errors.Wrap(err, "json unmarshal failed"))
continue
}
// atomic.AddUint64(&s.acc, 1)
s.acc++
c <- p
}
}
func (s *testStorage) Write(c <-chan point) {
defer s.wg.Done()
i := 0
a, err := s.s.Appender()
if err != nil {
panic(err)
}
for p := range c {
i++
ls, t, v := p.translate()
_, err = a.Add(ls, t, v)
if err != nil {
log.Println(err)
continue
}
if i%10 == 0 {
err = a.Commit()
if err != nil {
log.Println(err)
}
}
}
// commit remaining points
err = a.Commit()
if err != nil {
log.Println(err)
}
}
func main() {
flag.Parse()
s := initStorage(dbPath)
var wg sync.WaitGroup
wg.Add(numWrite)
s.wg = &wg
log.Printf("start promethues load %+v...", s)
c := make(chan point, 100000)
go s.Ingest(c)
for i := 0; i < numWrite; i++ {
go s.Write(c)
}
log.Println("wait write...")
closed := make(chan struct{}, 1)
<-closed
err := s.Close()
if err != nil {
log.Printf("close db failed: %v\n", err)
}
log.Printf("write done, total %v points\n", s.acc)
}
@jiacai2050
Copy link
Author

# generate opentsdb metrics
./bin/bulk_data_gen -format opentsdb \
-scale-var 100 \
-timestamp-start "2019-12-02T00:00:00Z" \
-timestamp-end "2019-12-02T01:00:00Z" \
-seed 20191221   > $input &

# ingest metric to prometheus tsdb
./bin/bulk_load_prometheus -numWrite 10 -input $input -dbPath $prom_data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment