Created
December 23, 2019 15:44
-
-
Save jiacai2050/046b9281c70c7d507bef904f9533665f to your computer and use it in GitHub Desktop.
https://github.com/influxdata/influxdb-comparisons with prometheus support
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// bulk_load_prometheus fills prometheus tsdb with data (opentsdb-style) from stdin. | |
// | |
// The caller is responsible for assuring that the database is empty before | |
// bulk load. | |
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io" | |
"log" | |
"net/http" | |
_ "net/http/pprof" | |
"os" | |
"runtime" | |
"strings" | |
"sync" | |
"time" | |
"github.com/pkg/errors" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/common/model" | |
"github.com/prometheus/prometheus/pkg/labels" | |
"github.com/prometheus/prometheus/storage" | |
"github.com/prometheus/prometheus/storage/tsdb" | |
) | |
var ( | |
dbPath string | |
input string | |
numWrite int | |
walCompress bool | |
) | |
func init() { | |
log.SetFlags(log.Lshortfile | log.LstdFlags) | |
flag.StringVar(&dbPath, "dbPath", "/tmp", "prometheus data path") | |
flag.StringVar(&input, "input", "/tmp/input", "ingest data") | |
flag.IntVar(&numWrite, "numWrite", runtime.NumCPU()+1, "write concurrence") | |
flag.BoolVar(&walCompress, "walCompress", false, "wal compression") | |
go func() { | |
if err := http.ListenAndServe("0.0.0.0:8000", nil); err != nil { | |
fmt.Fprintf(os.Stderr, "Start profile server failed: %v", err) | |
} | |
}() | |
} | |
type testStorage struct { | |
s storage.Storage | |
dir string | |
wg *sync.WaitGroup | |
acc uint64 | |
} | |
type logFunc func(kvs ...interface{}) | |
func (f logFunc) Log(kvs ...interface{}) error { | |
f(kvs) | |
return nil | |
} | |
func initStorage(path string) *testStorage { | |
err := os.Mkdir(path, os.ModePerm) | |
if err != nil { | |
panic(err) | |
} | |
l := func(kvs ...interface{}) { | |
log.Printf("kvs = %+v", kvs) | |
} | |
db, err := tsdb.Open(path, logFunc(l), prometheus.DefaultRegisterer, &tsdb.Options{ | |
MinBlockDuration: model.Duration(1 * time.Second), | |
MaxBlockDuration: model.Duration(10 * time.Second), | |
RetentionDuration: model.Duration(365 * 24 * time.Hour), | |
WALSegmentSize: 0, | |
WALCompression: walCompress, | |
}) | |
if err != nil { | |
panic(err) | |
} | |
return &testStorage{s: tsdb.Adapter(db, int64(0)), dir: path} | |
} | |
func (s *testStorage) Close() error { | |
s.wg.Wait() | |
log.Println("close db...") | |
return s.s.Close() | |
// if err := s.s.Close(); err != nil { | |
// return err | |
// } | |
// return os.RemoveAll(s.dir) | |
} | |
type point struct { | |
Metric string | |
Timestamp int64 | |
Tags map[string]string | |
Value float64 | |
} | |
func (p point) translate() (labels.Labels, int64, float64) { | |
ls := make(labels.Labels, 0, len(p.Tags)+1) | |
for k, v := range p.Tags { | |
ls = append(ls, labels.Label{ | |
Name: k, | |
Value: v, | |
}) | |
} | |
ls = append(ls, labels.Label{ | |
Name: labels.MetricName, | |
Value: strings.ReplaceAll(p.Metric, ".", "_"), | |
}) | |
return ls, p.Timestamp, p.Value | |
} | |
func (s *testStorage) Ingest(c chan<- point) { | |
defer close(c) | |
f, err := os.Open(input) | |
if err != nil { | |
panic(err) | |
} | |
buf := bufio.NewReader(f) | |
for { | |
bs, err := buf.ReadBytes('\n') | |
if err != nil { | |
if err == io.EOF { | |
log.Println("ingest done") | |
break | |
} else { | |
log.Println(errors.Wrap(err, "read line failed")) | |
continue | |
} | |
} | |
// log.Printf("read %s\n", bs) | |
var p point | |
err = json.Unmarshal(bs, &p) | |
if err != nil { | |
log.Println(errors.Wrap(err, "json unmarshal failed")) | |
continue | |
} | |
// atomic.AddUint64(&s.acc, 1) | |
s.acc++ | |
c <- p | |
} | |
} | |
func (s *testStorage) Write(c <-chan point) { | |
defer s.wg.Done() | |
i := 0 | |
a, err := s.s.Appender() | |
if err != nil { | |
panic(err) | |
} | |
for p := range c { | |
i++ | |
ls, t, v := p.translate() | |
_, err = a.Add(ls, t, v) | |
if err != nil { | |
log.Println(err) | |
continue | |
} | |
if i%10 == 0 { | |
err = a.Commit() | |
if err != nil { | |
log.Println(err) | |
} | |
} | |
} | |
// commit remaining points | |
err = a.Commit() | |
if err != nil { | |
log.Println(err) | |
} | |
} | |
func main() { | |
flag.Parse() | |
s := initStorage(dbPath) | |
var wg sync.WaitGroup | |
wg.Add(numWrite) | |
s.wg = &wg | |
log.Printf("start promethues load %+v...", s) | |
c := make(chan point, 100000) | |
go s.Ingest(c) | |
for i := 0; i < numWrite; i++ { | |
go s.Write(c) | |
} | |
log.Println("wait write...") | |
closed := make(chan struct{}, 1) | |
<-closed | |
err := s.Close() | |
if err != nil { | |
log.Printf("close db failed: %v\n", err) | |
} | |
log.Printf("write done, total %v points\n", s.acc) | |
} |
Author
jiacai2050
commented
Dec 23, 2019
- https://groups.google.com/d/msg/prometheus-users/9yb481QbZpg/v3j1DJMsBAAJ
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment