Skip to content

Instantly share code, notes, and snippets.

@bwplotka
Last active December 19, 2020 09:52
Show Gist options
  • Save bwplotka/cbcbbcd1802181b7785da11dcc0f5cfd to your computer and use it in GitHub Desktop.
Save bwplotka/cbcbbcd1802181b7785da11dcc0f5cfd to your computer and use it in GitHub Desktop.
Thanos Go Benchmarks Real Block Queries
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package manual
import (
"context"
"io/ioutil"
"os"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil"
)
// Versioned here: https://gist.github.com/bwplotka/cbcbbcd1802181b7785da11dcc0f5cfd
var (
testCases = []struct {
bktConfig string
cases []*storetestutil.SeriesCase
}{
{
bktConfig: `/home/bwplotka/Repos/_dev/thanos/datahub/bucket.yaml`,
cases: []*storetestutil.SeriesCase{
{
Name: "alerts2w/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1567641600000,
MaxTime: 1568851200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "alerts",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 224002),
},
{
// Simulate instant query (still 2k results).
Name: "alerts15s/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1568851200000 - int64((15*time.Second)/time.Millisecond),
MaxTime: 1568851200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "alerts",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 2074),
},
{
Name: "subssyncs2w/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1567641600000,
MaxTime: 1568851200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_sync_total",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 27951),
},
{
Name: "subs2w/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1567641600000,
MaxTime: 1568851200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_labels",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 536813),
},
{
Name: "subs2w1account/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1567641600000,
MaxTime: 1568851200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_labels",
Type: storepb.LabelMatcher_EQ,
},
{
Name: "ebs_account", Value: "6274079",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 260074), // 0.9s
},
{
Name: "subs2w1accountWithChunks/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1567641600000,
MaxTime: 1568851200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_labels",
Type: storepb.LabelMatcher_EQ,
},
{
Name: "ebs_account", Value: "6274079",
Type: storepb.LabelMatcher_EQ,
},
},
},
ExpectedSeries: make([]*storepb.Series, 260074), // 4-5s
},
{
Name: "subs15m1account/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1568835900000,
MaxTime: 1568836800000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_labels",
Type: storepb.LabelMatcher_EQ,
},
{
Name: "ebs_account", Value: "6274079",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 16842), // 0.5-0.6s
},
{
Name: "subs15m1accountWithChunks/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1568835900000,
MaxTime: 1568836800000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_labels",
Type: storepb.LabelMatcher_EQ,
},
{
Name: "ebs_account", Value: "6274079",
Type: storepb.LabelMatcher_EQ,
},
},
},
ExpectedSeries: make([]*storepb.Series, 16842), // 1-1.5s
},
{
// Simulate instant query.
Name: "subs15s/01DN3SK96XDAEKRB1AN30AAW6E",
Req: &storepb.SeriesRequest{
MinTime: 1568851200000 - int64((15*time.Second)/time.Millisecond),
MaxTime: 1568851200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_labels",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 0),
},
},
},
{
// 200 MB index header. 166521739814ns (~2m) to create index header!
bktConfig: "/home/bwplotka/Repos/_dev/telemeter/bucket.yaml",
cases: []*storetestutil.SeriesCase{
{
Name: "alerts/01EPXBGA0413QZMTF4XQ0M4Q3E",
Req: &storepb.SeriesRequest{
MinTime: 1603929600000,
MaxTime: 1605139200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "alerts",
Type: storepb.LabelMatcher_EQ,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 224002),
},
{
Name: "subs/01EPXBGA0413QZMTF4XQ0M4Q3E",
Req: &storepb.SeriesRequest{
MinTime: 1603929600000,
MaxTime: 1605139200000,
Matchers: []storepb.LabelMatcher{
{
Name: "__name__", Value: "subscription_sync_total",
Type: storepb.LabelMatcher_EQ,
},
{
Name: "installed", Value: ".*hyperconverged.*",
Type: storepb.LabelMatcher_RE,
},
},
SkipChunks: true,
},
ExpectedSeries: make([]*storepb.Series, 224002),
},
},
},
}
)
func TestTelemeterRealData_Series(t *testing.T) {
seriesBlock(testutil.NewTB(t))
}
func BenchmarkTelemeterRealData_Series(b *testing.B) {
seriesBlock(testutil.NewTB(b))
}
func seriesBlock(t testutil.TB) {
tmpDir, err := ioutil.TempDir("", "testorbench-manual-bucketseries")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
logger := log.NewNopLogger()
if !t.IsBenchmark() {
logger = log.NewLogfmtLogger(os.Stderr)
}
// Hardcoded choice.
bktConfig := testCases[0].bktConfig
//cases := testCases[0].cases
cases := testCases[0].cases[4:8]
//bktConfig := testCases[1].bktConfig
//cases := testCases[1].cases[1:] // Just alerts for now.
b, err := ioutil.ReadFile(bktConfig)
testutil.Ok(t, err)
bkt, err := client.NewBucket(logger, b, nil, "yolo")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()
f, err := block.NewRawMetaFetcher(logger, bkt)
testutil.Ok(t, err)
st, err := store.NewBucketStore(
logger,
nil,
bkt,
f,
tmpDir,
nil,
nil,
nil,
store.NewChunksLimiterFactory(0),
false,
1,
nil,
false,
store.DefaultPostingOffsetInMemorySampling,
false,
false,
0,
)
testutil.Ok(t, err)
testutil.Ok(t, st.SyncBlocks(context.Background()))
storetestutil.TestServerSeries(t, st, cases...)
}
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package manual
import (
"context"
"fmt"
"net"
"net/http"
"path"
"sync"
"testing"
"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/objstore/s3"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)
func httpPrompt(t *testing.T) {
t.Log("Running until you hit \"http://localhost:9191\"")
wg := sync.WaitGroup{}
wg.Add(1)
o := sync.Once{}
l, err := net.Listen("tcp", "localhost:9191")
testutil.Ok(t, err)
go func() {
_ = http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) {
o.Do(func() {
wg.Done()
})
_, _ = w.Write([]byte("Finishing"))
}))
}()
wg.Wait()
testutil.Ok(t, l.Close())
t.Log("Finishing")
}
func TestStoreGateway_Manual(t *testing.T) {
s, err := e2e.NewScenario("e2e_test_store_gateway")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, s))
m := e2edb.NewMinio(8080, "thanos")
testutil.Ok(t, s.StartAndWaitReady(m))
s1, err := e2ethanos.NewStoreGW(s.SharedDir(), "1", client.BucketConfig{
Type: client.S3,
Config: s3.Config{
Bucket: "thanos",
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.NetworkHTTPEndpoint(),
Insecure: true,
},
}, relabel.Config{
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("value2"),
SourceLabels: model.LabelNames{"ext1"},
})
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(s1))
// Ensure bucket UI.
r, err := http.Get("http://" + path.Join(s1.HTTPEndpoint(), "loaded"))
testutil.Ok(t, err)
testutil.Equals(t, http.StatusOK, r.StatusCode)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{s1.GRPCNetworkEndpoint()}, nil, nil, "", "")
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))
l := log.NewNopLogger()
bkt, err := s3.NewBucketWithConfig(l, s3.Config{
Bucket: "thanos",
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.HTTPEndpoint(), // We need separate client config, when connecting to minio from outside.
Insecure: true,
}, "test-feed")
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(context.Background(), l, bkt, "/home/bwplotka/Repos/_dev/thanos/datahub/01DN3SK96XDAEKRB1AN30AAW6E", "01DN3SK96XDAEKRB1AN30AAW6E"))
// Wait for store to sync blocks.
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(1), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(1), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(0), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(0), "thanos_bucket_store_block_load_failures_total"))
t.Log("Done, waiting. query:", fmt.Sprintf("http://%v/graph?g0.range_input=15m&g0.end_input=2019-09-18%2018%3A20&g0.max_source_resolution=0s&g0.expr=subscription_sync_total&g0.tab=0", q.HTTPEndpoint()))
httpPrompt(t)
t.Log("Finished.")
}
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package manual
import (
"context"
"io/ioutil"
"os"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)
const bktConfig = `/home/bwplotka/Repos/_dev/datahub/bucket.yaml`
type queryTestCase struct {
Name string
Query string
MinTime, MaxTime int64
Step time.Duration // If 0, means instant query.
ExpectedSeriesCount int
// Expect result for accuracy?
}
func TestTelemeterRealData_Query(t *testing.T) {
queryBlockThroughStore(testutil.NewTB(t))
}
func BenchmarkTelemeterRealData_Query(b *testing.B) {
queryBlockThroughStore(testutil.NewTB(b))
}
func queryBlockThroughStore(t testutil.TB) {
tmpDir, err := ioutil.TempDir("", "testorbench-manual-query")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
logger := log.NewNopLogger()
if !t.IsBenchmark() {
logger = log.NewLogfmtLogger(os.Stderr)
}
b, err := ioutil.ReadFile(bktConfig)
testutil.Ok(t, err)
bkt, err := client.NewBucket(logger, b, nil, "yolo")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()
f, err := block.NewRawMetaFetcher(logger, bkt)
testutil.Ok(t, err)
st, err := store.NewBucketStore(
logger,
nil,
bkt,
f,
tmpDir,
nil,
nil,
nil,
store.NewChunksLimiterFactory(0),
false,
1,
nil,
false,
store.DefaultPostingOffsetInMemorySampling,
false,
false,
0,
)
testutil.Ok(t, err)
testutil.Ok(t, st.SyncBlocks(context.Background()))
timeout := 5 * time.Minute
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 100e7,
Timeout: timeout,
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
}
engine := promql.NewEngine(opts)
for _, c := range []queryTestCase{
{
Name: "alerts2w/01DN3SK96XDAEKRB1AN30AAW6E",
Query: "alerts{}",
MinTime: 1567641600000,
MaxTime: 1568851200000,
Step: 270 * time.Second,
ExpectedSeriesCount: 224002,
},
{
Name: "alerts15s/01DN3SK96XDAEKRB1AN30AAW6E",
Query: "alerts{}",
MaxTime: 1568851200000,
Step: 0, // Instant.
ExpectedSeriesCount: 1,
},
{
Name: "subssyncs2w/01DN3SK96XDAEKRB1AN30AAW6E",
Query: "subscription_sync_total{}",
MinTime: 1567641600000,
MaxTime: 1568851200000,
Step: 270 * time.Second,
ExpectedSeriesCount: 27951,
},
{
Name: "subs2w/01DN3SK96XDAEKRB1AN30AAW6E",
Query: "subscription_labels{}",
MinTime: 1567641600000,
MaxTime: 1568851200000,
Step: 270,
ExpectedSeriesCount: 536813,
},
{
Name: "subs2w1account/01DN3SK96XDAEKRB1AN30AAW6E",
Query: "subscription_labels{ebs_account=\"6274079\"}",
MinTime: 1567641600000,
MaxTime: 1568851200000,
Step: 270 * time.Second,
ExpectedSeriesCount: 260074, // 4-5s for just store.
},
{
Name: "subs15m1account/01DN3SK96XDAEKRB1AN30AAW6E",
Query: "subscription_labels{ebs_account=\"6274079\"}",
MinTime: 1568835900000,
MaxTime: 1568836800000,
Step: 270 * time.Second,
ExpectedSeriesCount: 16861, // 1-1.5s just store.
},
{
Name: "subs15s/01DN3SK96XDAEKRB1AN30AAW6E",
Query: "subscription_labels{}",
MaxTime: 1568851200000,
Step: 0, // Instant.
ExpectedSeriesCount: 1,
},
} {
t.Run(c.Name, func(t testutil.TB) {
t.Run("via proxy", func(t testutil.TB) {
benchQuery(t, engine, c, query.NewQueryableCreator(
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client {
return []store.Client{query.NewInProcessClient(t, "Bucket store", storepb.ServerAsClient(st, 1), nil)}
},
component.Debug, nil, timeout),
1000000,
timeout,
))
})
t.Run("direct bucket", func(t testutil.TB) {
benchQuery(t, engine, c, query.NewQueryableCreator(logger, nil, st, 1000000, timeout))
})
})
}
}
func benchQuery(t testutil.TB, engine *promql.Engine, c queryTestCase, q query.QueryableCreator) {
qr := q(true, nil, nil, 0, false, false)
for i := 0; i < t.N(); i++ {
var query promql.Query
var err error
if c.Step == time.Duration(0) {
query, err = engine.NewInstantQuery(qr, c.Query, timestamp.Time(c.MaxTime))
} else {
query, err = engine.NewRangeQuery(qr, c.Query, timestamp.Time(c.MinTime), timestamp.Time(c.MaxTime), c.Step)
}
testutil.Ok(t, err)
r := query.Exec(context.Background())
testutil.Ok(t, r.Err)
testutil.Equals(t, 0, len(r.Warnings))
if !t.IsBenchmark() {
switch val := r.Value.(type) {
case promql.Matrix:
testutil.Equals(t, c.ExpectedSeriesCount, val.Len())
case promql.Vector:
testutil.Equals(t, c.ExpectedSeriesCount, 1)
case promql.Scalar:
testutil.Equals(t, c.ExpectedSeriesCount, 1)
}
}
}
}
func durationMilliseconds(d time.Duration) int64 {
return int64(d / (time.Millisecond / time.Nanosecond))
}
func timeMilliseconds(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment