Skip to content

Instantly share code, notes, and snippets.

@pstibrany
Created October 25, 2022 10:51
Show Gist options
  • Save pstibrany/7a4384aee0b8389475ca7dd34af58410 to your computer and use it in GitHub Desktop.
Save pstibrany/7a4384aee0b8389475ca7dd34af58410 to your computer and use it in GitHub Desktop.
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/frontend/v2/frontend_test.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
package v2
import (
"context"
"fmt"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util/servicediscovery"
)
const testFrontendWorkerConcurrency = 5
func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) {
return setupFrontendWithConcurrencyAndServerOptions(t, reg, schedulerReplyFunc, testFrontendWorkerConcurrency)
}
func setupFrontendWithConcurrencyAndServerOptions(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend, concurrency int, opts ...grpc.ServerOption) (*Frontend, *mockScheduler) {
l, err := net.Listen("tcp", "")
require.NoError(t, err)
server := grpc.NewServer(opts...)
h, p, err := net.SplitHostPort(l.Addr().String())
require.NoError(t, err)
grpcPort, err := strconv.Atoi(p)
require.NoError(t, err)
cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.SchedulerAddress = l.Addr().String()
cfg.WorkerConcurrency = concurrency
cfg.Addr = h
cfg.Port = grpcPort
logger := log.NewLogfmtLogger(os.Stdout)
//logger := log.NewNopLogger()
f, err := NewFrontend(cfg, logger, reg)
require.NoError(t, err)
frontendv2pb.RegisterFrontendForQuerierServer(server, f)
ms := newMockScheduler(t, f, schedulerReplyFunc)
schedulerpb.RegisterSchedulerForFrontendServer(server, ms)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), f))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), f)
})
go func() {
_ = server.Serve(l)
}()
t.Cleanup(func() {
_ = l.Close()
server.GracefulStop()
})
// Wait for frontend to connect to scheduler.
test.Poll(t, 1*time.Second, 1, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()
return len(ms.frontendAddr)
})
return f, ms
}
func sendResponseWithDelay(f *Frontend, delay time.Duration, userID string, queryID uint64, resp *httpgrpc.HTTPResponse) {
if delay > 0 {
time.Sleep(delay)
}
ctx := user.InjectOrgID(context.Background(), userID)
_, _ = f.QueryResult(ctx, &frontendv2pb.QueryResultRequest{
QueryID: queryID,
HttpResponse: resp,
Stats: &stats.Stats{},
})
}
func TestFrontendBasicWorkflow(t *testing.T) {
const (
body = "all fine here"
userID = "test"
)
f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
// We cannot call QueryResult directly, as Frontend is not yet waiting for the response.
// It first needs to be told that enqueuing has succeeded.
go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte(body),
})
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)
}
func TestFrontendRequestsPerWorkerMetric(t *testing.T) {
const (
body = "all fine here"
userID = "test"
)
reg := prometheus.NewRegistry()
f, _ := setupFrontend(t, reg, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
// We cannot call QueryResult directly, as Frontend is not yet waiting for the response.
// It first needs to be told that enqueuing has succeeded.
go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte(body),
})
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})
expectedMetrics := fmt.Sprintf(`
# HELP cortex_query_frontend_workers_enqueued_requests_total Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.
# TYPE cortex_query_frontend_workers_enqueued_requests_total counter
cortex_query_frontend_workers_enqueued_requests_total{scheduler_address="%s"} 0
`, f.cfg.SchedulerAddress)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)
expectedMetrics = fmt.Sprintf(`
# HELP cortex_query_frontend_workers_enqueued_requests_total Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.
# TYPE cortex_query_frontend_workers_enqueued_requests_total counter
cortex_query_frontend_workers_enqueued_requests_total{scheduler_address="%s"} 1
`, f.cfg.SchedulerAddress)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))
// Manually remove the address, check that label is removed.
f.schedulerWorkers.InstanceRemoved(servicediscovery.Instance{Address: f.cfg.SchedulerAddress, InUse: true})
expectedMetrics = ``
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))
}
func TestFrontendRetryEnqueue(t *testing.T) {
// Frontend uses worker concurrency to compute number of retries. We use one less failure.
failures := atomic.NewInt64(testFrontendWorkerConcurrency - 1)
const (
body = "hello world"
userID = "test"
)
f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
fail := failures.Dec()
if fail >= 0 {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
}
go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte(body),
})
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
}
func TestFrontendTooManyRequests(t *testing.T) {
f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT}
})
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(http.StatusTooManyRequests), resp.Code)
}
func TestFrontendEnqueueFailure(t *testing.T) {
f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
})
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{})
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
}
func TestFrontendCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
// We wait a bit to make sure scheduler receives the cancellation request.
test.Poll(t, time.Second, 2, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()
return len(ms.msgs)
})
ms.checkWithLock(func() {
require.Equal(t, 2, len(ms.msgs))
require.True(t, ms.msgs[0].Type == schedulerpb.ENQUEUE)
require.True(t, ms.msgs[1].Type == schedulerpb.CANCEL)
require.True(t, ms.msgs[0].QueryID == ms.msgs[1].QueryID)
})
}
// When frontendWorker that processed the request is busy (processing a new request or cancelling a previous one)
// we still need to make sure that the cancellation reach the scheduler at some point.
// Issue: https://github.com/grafana/mimir/issues/740
func TestFrontendWorkerCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}
wg.Wait()
// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()
return len(ms.msgs)
})
ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))
msgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{}
for _, msg := range ms.msgs {
msgTypeCounts[msg.Type]++
}
expectedMsgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{
schedulerpb.ENQUEUE: reqCount,
schedulerpb.CANCEL: reqCount,
}
require.Equalf(t, expectedMsgTypeCounts, msgTypeCounts,
"Should receive %d enqueue (%d) requests, and %d cancel (%d) requests.", reqCount, schedulerpb.ENQUEUE, reqCount, schedulerpb.CANCEL,
)
})
}
func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
time.Sleep(100 * time.Millisecond)
// stop scheduler workers
addr := ""
f.schedulerWorkers.mu.Lock()
for k := range f.schedulerWorkers.workers {
addr = k
break
}
f.schedulerWorkers.mu.Unlock()
f.schedulerWorkers.InstanceRemoved(servicediscovery.Instance{Address: addr, InUse: true})
// Wait for worker goroutines to stop.
time.Sleep(100 * time.Millisecond)
// Cancel request. Frontend will try to send cancellation to scheduler, but that will fail (not visible to user).
// Everything else should still work fine.
cancel()
}()
// send request
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, resp)
ms.checkWithLock(func() {
require.Equal(t, 1, len(ms.msgs))
})
}
type mockScheduler struct {
t *testing.T
f *Frontend
replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend
mu sync.Mutex
frontendAddr map[string]int
msgs []*schedulerpb.FrontendToScheduler
}
func newMockScheduler(t *testing.T, f *Frontend, replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) *mockScheduler {
return &mockScheduler{t: t, f: f, frontendAddr: map[string]int{}, replyFunc: replyFunc}
}
func (m *mockScheduler) checkWithLock(fn func()) {
m.mu.Lock()
defer m.mu.Unlock()
fn()
}
func (m *mockScheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error {
init, err := frontend.Recv()
if err != nil {
return err
}
m.mu.Lock()
m.frontendAddr[init.FrontendAddress]++
m.mu.Unlock()
// Ack INIT from frontend.
if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil {
return err
}
for {
msg, err := frontend.Recv()
if err != nil {
return err
}
m.mu.Lock()
m.msgs = append(m.msgs, msg)
m.mu.Unlock()
reply := &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
if m.replyFunc != nil {
reply = m.replyFunc(m.f, msg)
}
if err := frontend.Send(reply); err != nil {
return err
}
}
}
func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
setup func(cfg *Config)
expectedErr string
}{
"should pass with default config": {
setup: func(cfg *Config) {},
},
"should pass if scheduler address is configured, and query-scheduler discovery mode is the default one": {
setup: func(cfg *Config) {
cfg.SchedulerAddress = "localhost:9095"
},
},
"should fail if query-scheduler service discovery is set to ring, and scheduler address is configured": {
setup: func(cfg *Config) {
cfg.QuerySchedulerDiscovery.Mode = schedulerdiscovery.ModeRing
cfg.SchedulerAddress = "localhost:9095"
},
expectedErr: `scheduler address cannot be specified when query-scheduler service discovery mode is set to 'ring'`,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
testData.setup(&cfg)
actualErr := cfg.Validate(log.NewNopLogger())
if testData.expectedErr == "" {
require.NoError(t, actualErr)
} else {
require.Error(t, actualErr)
assert.ErrorContains(t, actualErr, testData.expectedErr)
}
})
}
}
func TestWithClosingGrpcServer(t *testing.T) {
// This test is easier with single frontend worker.
const frontendConcurrency = 1
const userID = "test"
f, _ := setupFrontendWithConcurrencyAndServerOptions(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT}
}, frontendConcurrency, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 100 * time.Millisecond,
MaxConnectionAge: 100 * time.Millisecond,
MaxConnectionAgeGrace: 100 * time.Millisecond,
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}))
// Connection will be established on the first roundtrip.
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int(resp.Code), http.StatusTooManyRequests)
// Verify that there is one stream open.
require.Equal(t, 1, checkStreamGoroutines())
// Wait a bit, to make sure that server closes connection.
time.Sleep(1 * time.Second)
// Despite server closing connections, stream-related goroutines still exist.
require.Equal(t, 1, checkStreamGoroutines())
// Another request will work as before, because worker will recreate connection.
resp, err = f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int(resp.Code), http.StatusTooManyRequests)
// There should still be only one stream open, and one goroutine created for it.
// Previously frontend leaked goroutine because stream that received "EOF" due to server closing the connection, never stopped its goroutine.
require.Equal(t, 1, checkStreamGoroutines())
}
const createdBy = "created by google.golang.org/grpc.newClientStreamWithParams"
func checkStreamGoroutines() int {
buf := make([]byte, 1000000)
stacklen := runtime.Stack(buf, true)
str := string(buf[:stacklen])
count := 0
for len(str) > 0 {
ix := strings.Index(str, createdBy)
if ix < 0 {
break
}
count++
str = str[ix+len(createdBy):]
}
return count
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment