Skip to content

Instantly share code, notes, and snippets.

@justinfx
Created September 3, 2024 05:12
Show Gist options
  • Save justinfx/d3d5b6aab2ccf8fc7a4d7fa4c10bfa59 to your computer and use it in GitHub Desktop.
Save justinfx/d3d5b6aab2ccf8fc7a4d7fa4c10bfa59 to your computer and use it in GitHub Desktop.
grpc retryPolicy breaking with server grpc+http/2 h2c mux on same port
package main
import (
"context"
"flag"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "google.golang.org/grpc/examples/features/proto/echo"
)
var (
addr = flag.String("addr", "localhost:50052", "the address to connect to")
retryPolicy = `{
"loadBalancingConfig": [ { "round_robin": {} } ],
"methodConfig": [{
"name": [{"service": ""}],
"waitForReady": true,
"timeout": "30s",
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": "0.1s",
"MaxBackoff": "5s",
"BackoffMultiplier": 5.0,
"RetryableStatusCodes": [ "UNAVAILABLE", "RESOURCE_EXHAUSTED", "UNKNOWN", "INTERNAL" ]
}
}]}`
)
func main() {
flag.Parse()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
conn, err := grpc.NewClient(*addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(retryPolicy),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func() {
if e := conn.Close(); e != nil {
log.Printf("failed to close connection: %s", e)
}
}()
c := pb.NewEchoClient(conn)
reply, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Try and Success"})
if err != nil {
log.Fatalf("UnaryEcho error: %v", err)
}
log.Printf("UnaryEcho reply: %v", reply)
}
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
"strings"
"sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "google.golang.org/grpc/examples/features/proto/echo"
)
var (
port = flag.Int("port", 50052, "port number")
doMux = flag.Bool("mux", false, "do grpc/h2c mux")
)
type failingServer struct {
pb.UnimplementedEchoServer
reqCounter atomic.Int64
}
func (s *failingServer) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
n := s.reqCounter.Add(1)
if n%4 != 0 {
log.Println("request failed count:", n)
return nil, status.Errorf(codes.Internal, "TEST: Received RST_STREAM with error code 0")
}
log.Println("request succeeded count:", n)
return &pb.EchoResponse{Message: req.Message}, nil
}
func main() {
flag.Parse()
address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Println("listen on address", address)
grpcServ := grpc.NewServer()
pb.RegisterEchoServer(grpcServ, &failingServer{})
serve := grpcServ.Serve
if *doMux {
log.Println("Using muxing")
httpMux := http.NewServeMux()
mixedHandler := newHTTPandGRPCMux(httpMux, grpcServ)
http2Server := &http2.Server{}
http1Server := &http.Server{Handler: h2c.NewHandler(mixedHandler, http2Server)}
serve = http1Server.Serve
} else {
log.Println("Not using muxing")
}
if err := serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func newHTTPandGRPCMux(httpHand http.Handler, grpcHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("content-type"), "application/grpc") {
grpcHandler.ServeHTTP(w, r)
return
}
httpHand.ServeHTTP(w, r)
})
}
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
"sync/atomic"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "google.golang.org/grpc/examples/features/proto/echo"
)
var (
port = flag.Int("port", 50052, "port number")
doMux = flag.Bool("mux", false, "do grpc/h2c mux")
)
type failingServer struct {
pb.UnimplementedEchoServer
reqCounter atomic.Int64
}
func (s *failingServer) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
n := s.reqCounter.Add(1)
if n%4 != 0 {
log.Println("request failed count:", n)
return nil, status.Errorf(codes.Internal, "TEST: Received RST_STREAM with error code 0")
}
log.Println("request succeeded count:", n)
return &pb.EchoResponse{Message: req.Message}, nil
}
func main() {
flag.Parse()
address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Println("listen on address", address)
grpcServ := grpc.NewServer()
pb.RegisterEchoServer(grpcServ, &failingServer{})
serve := grpcServ.Serve
if *doMux {
log.Println("Using muxing")
m := cmux.New(lis)
grpcL := m.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc"))
httpL := m.Match(cmux.HTTP1())
httpMux := http.NewServeMux()
httpMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello from http handler!\n")
})
http1Server := &http.Server{Handler: httpMux}
go func() {
if err := grpcServ.Serve(grpcL); err != nil {
log.Fatalf("failed to start grpc: %v", err)
}
}()
go func() {
if err := http1Server.Serve(httpL); err != nil {
log.Fatalf("failed to start http: %v", err)
}
}()
if err := m.Serve(); err != nil {
log.Fatalf("failed to serve: %v", err)
}
return
}
log.Println("Not using muxing")
if err := serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment