Skip to content

Instantly share code, notes, and snippets.

@changkun
Forked from akhenakh/app.yaml
Created November 23, 2020 09:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save changkun/98d77c3025fb93ae17fd4528a8180787 to your computer and use it in GitHub Desktop.
Save changkun/98d77c3025fb93ae17fd4528a8180787 to your computer and use it in GitHub Desktop.
Example of graceful shutdown with grpc healthserver * httpserver
readinessProbe:
exec:
command: ["/root/grpc_health_probe", "-addr=:6666"]
initialDelaySeconds: 1
livenessProbe:
exec:
command: ["/root/grpc_health_probe", "-addr=:6666"]
initialDelaySeconds: 2
imagePullPolicy: IfNotPresent
package main
import (
"context"
"fmt"
stdlog "log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-kit/kit/log/level"
"github.com/namsral/flag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
)
const serviceName = "myserviced"
var (
version = "no version"
httpPort = flag.Int("httpPort", 8888, "http port")
grpcPort = flag.Int("grpcPort", 9200, "grpc port")
healthPort = flag.Int("healthPort", 6666, "grpc health port")
logLevel = flag.String("logLevel", "INFO", "Log level, INFO|WARNING|DEBUG|ERROR")
gcpProjectID = flag.String("gcpProjectID", "none", "GCP Project ID")
dev = flag.Bool("dev", false, "Enable development tooling")
grpcServer *grpc.Server
grpcHealthServer *grpc.Server
httpServer *http.Server
)
func main() {
flag.Parse()
logger, err := probe.InitLogger(serviceName, *logLevel)
if err != nil {
stdlog.Fatal("can't init logger", err)
}
level.Info(logger).Log("msg", "Starting app", "version", version)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interrupt)
g, ctx := errgroup.WithContext(ctx)
// web server metrics
g.Go(func() error {
httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", *httpPort),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
level.Info(logger).Log("msg", fmt.Sprintf("HTTP Metrics server serving at :%d", *httpPort))
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
return err
}
return nil
})
// gRPC Health Server
healthServer := health.NewServer()
g.Go(func() error {
grpcHealthServer = grpc.NewServer()
healthpb.RegisterHealthServer(grpcHealthServer, healthServer)
haddr := fmt.Sprintf(":%d", *healthPort)
hln, err := net.Listen("tcp", haddr)
if err != nil {
level.Error(logger).Log("msg", "gRPC Health server: failed to listen", "error", err)
os.Exit(2)
}
level.Info(logger).Log("msg", fmt.Sprintf("gRPC health server serving at %s", haddr))
return grpcHealthServer.Serve(hln)
})
// gRPC server
g.Go(func() error {
addr := fmt.Sprintf(":%d", *grpcPort)
ln, err := net.Listen("tcp", addr)
if err != nil {
level.Error(logger).Log("msg", "gRPC server: failed to listen", "error", err)
os.Exit(2)
}
server := &myservice.Server{
Logger: logger,
Health: healthServer,
ServiceName: serviceName,
}
grpcServer = grpc.NewServer(
// MaxConnectionAge is just to avoid long connection, to facilitate load balancing
// MaxConnectionAgeGrace will torn them, default to infinity
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: 2 * time.Minute}),
grpc.StatsHandler(&ocgrpc_propag.ServerHandler{}),
)
myservice.RegisterNyServiceServer(grpcServer, server)
level.Info(logger).Log("msg", fmt.Sprintf("gRPC server serving at %s", addr))
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_SERVING)
return grpcServer.Serve(ln)
})
select {
case <-interrupt:
break
case <-ctx.Done():
break
}
level.Warn(logger).Log("msg", "received shutdown signal")
cancel()
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_NOT_SERVING)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if httpServer != nil {
_ = httpServer.Shutdown(shutdownCtx)
}
if grpcServer != nil {
grpcServer.GracefulStop()
}
if grpcHealthServer != nil {
grpcHealthServer.GracefulStop()
}
err = g.Wait()
if err != nil {
level.Error(logger).Log("msg", "server returning an error", "error", err)
os.Exit(2)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment