Created
September 3, 2024 05:12
-
-
Save justinfx/d3d5b6aab2ccf8fc7a4d7fa4c10bfa59 to your computer and use it in GitHub Desktop.
grpc retryPolicy breaking with server grpc+http/2 h2c mux on same port
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"log" | |
"time" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/credentials/insecure" | |
pb "google.golang.org/grpc/examples/features/proto/echo" | |
) | |
var ( | |
addr = flag.String("addr", "localhost:50052", "the address to connect to") | |
retryPolicy = `{ | |
"loadBalancingConfig": [ { "round_robin": {} } ], | |
"methodConfig": [{ | |
"name": [{"service": ""}], | |
"waitForReady": true, | |
"timeout": "30s", | |
"retryPolicy": { | |
"MaxAttempts": 4, | |
"InitialBackoff": "0.1s", | |
"MaxBackoff": "5s", | |
"BackoffMultiplier": 5.0, | |
"RetryableStatusCodes": [ "UNAVAILABLE", "RESOURCE_EXHAUSTED", "UNKNOWN", "INTERNAL" ] | |
} | |
}]}` | |
) | |
func main() { | |
flag.Parse() | |
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
defer cancel() | |
conn, err := grpc.NewClient(*addr, | |
grpc.WithTransportCredentials(insecure.NewCredentials()), | |
grpc.WithDefaultServiceConfig(retryPolicy), | |
) | |
if err != nil { | |
log.Fatalf("did not connect: %v", err) | |
} | |
defer func() { | |
if e := conn.Close(); e != nil { | |
log.Printf("failed to close connection: %s", e) | |
} | |
}() | |
c := pb.NewEchoClient(conn) | |
reply, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Try and Success"}) | |
if err != nil { | |
log.Fatalf("UnaryEcho error: %v", err) | |
} | |
log.Printf("UnaryEcho reply: %v", reply) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"fmt" | |
"log" | |
"net" | |
"net/http" | |
"strings" | |
"sync/atomic" | |
"golang.org/x/net/http2" | |
"golang.org/x/net/http2/h2c" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/codes" | |
"google.golang.org/grpc/status" | |
pb "google.golang.org/grpc/examples/features/proto/echo" | |
) | |
var ( | |
port = flag.Int("port", 50052, "port number") | |
doMux = flag.Bool("mux", false, "do grpc/h2c mux") | |
) | |
type failingServer struct { | |
pb.UnimplementedEchoServer | |
reqCounter atomic.Int64 | |
} | |
func (s *failingServer) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { | |
n := s.reqCounter.Add(1) | |
if n%4 != 0 { | |
log.Println("request failed count:", n) | |
return nil, status.Errorf(codes.Internal, "TEST: Received RST_STREAM with error code 0") | |
} | |
log.Println("request succeeded count:", n) | |
return &pb.EchoResponse{Message: req.Message}, nil | |
} | |
func main() { | |
flag.Parse() | |
address := fmt.Sprintf(":%v", *port) | |
lis, err := net.Listen("tcp", address) | |
if err != nil { | |
log.Fatalf("failed to listen: %v", err) | |
} | |
log.Println("listen on address", address) | |
grpcServ := grpc.NewServer() | |
pb.RegisterEchoServer(grpcServ, &failingServer{}) | |
serve := grpcServ.Serve | |
if *doMux { | |
log.Println("Using muxing") | |
httpMux := http.NewServeMux() | |
mixedHandler := newHTTPandGRPCMux(httpMux, grpcServ) | |
http2Server := &http2.Server{} | |
http1Server := &http.Server{Handler: h2c.NewHandler(mixedHandler, http2Server)} | |
serve = http1Server.Serve | |
} else { | |
log.Println("Not using muxing") | |
} | |
if err := serve(lis); err != nil { | |
log.Fatalf("failed to serve: %v", err) | |
} | |
} | |
func newHTTPandGRPCMux(httpHand http.Handler, grpcHandler http.Handler) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("content-type"), "application/grpc") { | |
grpcHandler.ServeHTTP(w, r) | |
return | |
} | |
httpHand.ServeHTTP(w, r) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"fmt" | |
"log" | |
"net" | |
"net/http" | |
"sync/atomic" | |
"github.com/soheilhy/cmux" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/codes" | |
"google.golang.org/grpc/status" | |
pb "google.golang.org/grpc/examples/features/proto/echo" | |
) | |
var ( | |
port = flag.Int("port", 50052, "port number") | |
doMux = flag.Bool("mux", false, "do grpc/h2c mux") | |
) | |
type failingServer struct { | |
pb.UnimplementedEchoServer | |
reqCounter atomic.Int64 | |
} | |
func (s *failingServer) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { | |
n := s.reqCounter.Add(1) | |
if n%4 != 0 { | |
log.Println("request failed count:", n) | |
return nil, status.Errorf(codes.Internal, "TEST: Received RST_STREAM with error code 0") | |
} | |
log.Println("request succeeded count:", n) | |
return &pb.EchoResponse{Message: req.Message}, nil | |
} | |
func main() { | |
flag.Parse() | |
address := fmt.Sprintf(":%v", *port) | |
lis, err := net.Listen("tcp", address) | |
if err != nil { | |
log.Fatalf("failed to listen: %v", err) | |
} | |
log.Println("listen on address", address) | |
grpcServ := grpc.NewServer() | |
pb.RegisterEchoServer(grpcServ, &failingServer{}) | |
serve := grpcServ.Serve | |
if *doMux { | |
log.Println("Using muxing") | |
m := cmux.New(lis) | |
grpcL := m.MatchWithWriters( | |
cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc")) | |
httpL := m.Match(cmux.HTTP1()) | |
httpMux := http.NewServeMux() | |
httpMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { | |
fmt.Fprintf(w, "hello from http handler!\n") | |
}) | |
http1Server := &http.Server{Handler: httpMux} | |
go func() { | |
if err := grpcServ.Serve(grpcL); err != nil { | |
log.Fatalf("failed to start grpc: %v", err) | |
} | |
}() | |
go func() { | |
if err := http1Server.Serve(httpL); err != nil { | |
log.Fatalf("failed to start http: %v", err) | |
} | |
}() | |
if err := m.Serve(); err != nil { | |
log.Fatalf("failed to serve: %v", err) | |
} | |
return | |
} | |
log.Println("Not using muxing") | |
if err := serve(lis); err != nil { | |
log.Fatalf("failed to serve: %v", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment