Skip to content

Instantly share code, notes, and snippets.

@akhenakh
Last active March 4, 2023 19:22
Show Gist options
  • Star 47 You must be signed in to star a gist
  • Fork 13 You must be signed in to fork a gist
  • Save akhenakh/38dbfea70dc36964e23acc19777f3869 to your computer and use it in GitHub Desktop.
Save akhenakh/38dbfea70dc36964e23acc19777f3869 to your computer and use it in GitHub Desktop.
Example of graceful shutdown with grpc healthserver * httpserver
readinessProbe:
exec:
command: ["/root/grpc_health_probe", "-addr=:6666"]
initialDelaySeconds: 1
livenessProbe:
exec:
command: ["/root/grpc_health_probe", "-addr=:6666"]
initialDelaySeconds: 2
imagePullPolicy: IfNotPresent
package main
import (
"context"
"fmt"
stdlog "log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-kit/kit/log/level"
"github.com/namsral/flag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
)
const serviceName = "myserviced"
var (
version = "no version"
httpPort = flag.Int("httpPort", 8888, "http port")
grpcPort = flag.Int("grpcPort", 9200, "grpc port")
healthPort = flag.Int("healthPort", 6666, "grpc health port")
logLevel = flag.String("logLevel", "INFO", "Log level, INFO|WARNING|DEBUG|ERROR")
gcpProjectID = flag.String("gcpProjectID", "none", "GCP Project ID")
dev = flag.Bool("dev", false, "Enable development tooling")
grpcServer *grpc.Server
grpcHealthServer *grpc.Server
httpServer *http.Server
)
func main() {
flag.Parse()
logger, err := probe.InitLogger(serviceName, *logLevel)
if err != nil {
stdlog.Fatal("can't init logger", err)
}
level.Info(logger).Log("msg", "Starting app", "version", version)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interrupt)
g, ctx := errgroup.WithContext(ctx)
// web server metrics
g.Go(func() error {
httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", *httpPort),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
level.Info(logger).Log("msg", fmt.Sprintf("HTTP Metrics server serving at :%d", *httpPort))
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
return err
}
return nil
})
// gRPC Health Server
healthServer := health.NewServer()
g.Go(func() error {
grpcHealthServer = grpc.NewServer()
healthpb.RegisterHealthServer(grpcHealthServer, healthServer)
haddr := fmt.Sprintf(":%d", *healthPort)
hln, err := net.Listen("tcp", haddr)
if err != nil {
level.Error(logger).Log("msg", "gRPC Health server: failed to listen", "error", err)
os.Exit(2)
}
level.Info(logger).Log("msg", fmt.Sprintf("gRPC health server serving at %s", haddr))
return grpcHealthServer.Serve(hln)
})
// gRPC server
g.Go(func() error {
addr := fmt.Sprintf(":%d", *grpcPort)
ln, err := net.Listen("tcp", addr)
if err != nil {
level.Error(logger).Log("msg", "gRPC server: failed to listen", "error", err)
os.Exit(2)
}
server := &myservice.Server{
Logger: logger,
Health: healthServer,
ServiceName: serviceName,
}
grpcServer = grpc.NewServer(
// MaxConnectionAge is just to avoid long connection, to facilitate load balancing
// MaxConnectionAgeGrace will torn them, default to infinity
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: 2 * time.Minute}),
grpc.StatsHandler(&ocgrpc_propag.ServerHandler{}),
)
myservice.RegisterNyServiceServer(grpcServer, server)
level.Info(logger).Log("msg", fmt.Sprintf("gRPC server serving at %s", addr))
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_SERVING)
return grpcServer.Serve(ln)
})
select {
case <-interrupt:
break
case <-ctx.Done():
break
}
level.Warn(logger).Log("msg", "received shutdown signal")
cancel()
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_NOT_SERVING)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if httpServer != nil {
_ = httpServer.Shutdown(shutdownCtx)
}
if grpcServer != nil {
grpcServer.GracefulStop()
}
if grpcHealthServer != nil {
grpcHealthServer.GracefulStop()
}
err = g.Wait()
if err != nil {
level.Error(logger).Log("msg", "server returning an error", "error", err)
os.Exit(2)
}
}
@akhenakh
Copy link
Author

We want cancel() to happen early, right after the signal catch.

The defer cancel() is superfluous. Good catch.

@wtask
Copy link

wtask commented Jan 15, 2020

I think it's a redundant operation, as the inner context of errgroup is either already cancelled (case <-ctx.Done() due to error ) or will be cancelled immediately after first shutdown/stop. When context is cancelled outside errgroup-flow the group of "threads" will continue running despite cancellation, because the code inside g.Go() "threads" is not depend on cancelled context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment