Skip to content

Instantly share code, notes, and snippets.

@SaveTheRbtz
Created March 1, 2023 06:45
Show Gist options
  • Save SaveTheRbtz/cae6e8abb9311e2016acbab6bc63752a to your computer and use it in GitHub Desktop.
Save SaveTheRbtz/cae6e8abb9311e2016acbab6bc63752a to your computer and use it in GitHub Desktop.
NumWorkers benchmark
diff --git a/go.mod b/go.mod
index 8948f139..824d984a 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/envoyproxy/protoc-gen-validate v0.9.1 // indirect
+ golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.6.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
)
diff --git a/go.sum b/go.sum
index 4fdfdcb9..a085a4bd 100644
--- a/go.sum
+++ b/go.sum
@@ -269,6 +269,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/server_test.go b/server_test.go
index 85a8f5bf..bd445876 100644
--- a/server_test.go
+++ b/server_test.go
@@ -24,10 +24,15 @@ import (
"reflect"
"strconv"
"strings"
+ "sync"
"testing"
"time"
+ "github.com/golang/protobuf/ptypes/empty"
"github.com/google/go-cmp/cmp"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
)
@@ -224,3 +229,123 @@ func BenchmarkChainStreamInterceptor(b *testing.B) {
})
}
}
+
+type emptySlowServiceServer interface {
+ SlowCall(context.Context, *empty.Empty) (*empty.Empty, error)
+}
+
+type testSlowServer struct {
+ maxAlloc int
+}
+
+var sumMu sync.Mutex
+var sum int
+
+func (s *testSlowServer) SlowCall(ctx context.Context, in *empty.Empty) (*empty.Empty, error) {
+ sz := 1 + grpcrand.Intn(s.maxAlloc)
+ array := make([]int, sz)
+
+ time.Sleep(time.Duration(sz))
+
+ // do not optimize this away
+ sumMu.Lock()
+ sum += array[sz-1]
+ sumMu.Unlock()
+
+ return &empty.Empty{}, nil
+}
+
+var testSd = ServiceDesc{
+ ServiceName: "grpc.testing.SlowService",
+ HandlerType: (*emptySlowServiceServer)(nil),
+ Methods: []MethodDesc{
+ {
+ MethodName: "SlowCall",
+ Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) {
+ in := new(empty.Empty)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(emptySlowServiceServer).SlowCall(ctx, in)
+ }
+ info := &UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/grpc.testing.SlowService/SlowCall",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(emptySlowServiceServer).SlowCall(ctx, req.(*empty.Empty))
+ }
+ return interceptor(ctx, in, info, handler)
+ },
+ },
+ },
+}
+
+func BenchmarkWorkers(b *testing.B) {
+ testSlowServer := &testSlowServer{
+ maxAlloc: 100000,
+ }
+
+ for _, n := range []int{1, 3, 5, 10} {
+ n := n
+ b.Run(strconv.Itoa(n), func(b *testing.B) {
+ lis, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ b.Fatalf("failed to create listener: %v", err)
+ }
+
+ s := NewServer(NumStreamWorkers(uint32(n)))
+ s.RegisterService(&testSd, testSlowServer)
+
+ connectCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+
+ errCh := make(chan error, 1)
+ go func() {
+ err = s.Serve(lis)
+ errCh <- err
+ close(errCh)
+ }()
+
+ conn, err := DialContext(
+ connectCtx,
+ lis.Addr().String(),
+ WithTransportCredentials(insecure.NewCredentials()),
+ WithBlock(),
+ )
+ if err != nil {
+ b.Fatalf("failed to connect to: %s, err: %s", lis.Addr().String(), err)
+ }
+ defer conn.Close()
+
+ eg, ctx := errgroup.WithContext(context.Background())
+ eg.SetLimit(n)
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ eg.Go(func() error {
+ err := conn.Invoke(ctx, "/grpc.testing.SlowService/SlowCall", &empty.Empty{}, &empty.Empty{})
+ return err
+ })
+ }
+ err = eg.Wait()
+ if err != nil {
+ b.Fatal(err)
+ }
+ b.StopTimer()
+
+ s.Stop()
+
+ select {
+ case err = <-errCh:
+ if err != nil && err != ErrServerStopped {
+ b.Fatalf("server.Serve() error = %v, want %v", err, ErrServerStopped)
+ }
+ case <-time.After(5 * time.Second):
+ b.Fatal("server did not stop")
+ }
+ })
+ }
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment