Skip to content

Instantly share code, notes, and snippets.

@akhenakh
Last active May 14, 2022
Embed
What would you like to do?
Example of graceful shutdown with grpc healthserver * httpserver
readinessProbe:
exec:
command: ["/root/grpc_health_probe", "-addr=:6666"]
initialDelaySeconds: 1
livenessProbe:
exec:
command: ["/root/grpc_health_probe", "-addr=:6666"]
initialDelaySeconds: 2
imagePullPolicy: IfNotPresent
package main
import (
"context"
"fmt"
stdlog "log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-kit/kit/log/level"
"github.com/namsral/flag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
)
const serviceName = "myserviced"
var (
version = "no version"
httpPort = flag.Int("httpPort", 8888, "http port")
grpcPort = flag.Int("grpcPort", 9200, "grpc port")
healthPort = flag.Int("healthPort", 6666, "grpc health port")
logLevel = flag.String("logLevel", "INFO", "Log level, INFO|WARNING|DEBUG|ERROR")
gcpProjectID = flag.String("gcpProjectID", "none", "GCP Project ID")
dev = flag.Bool("dev", false, "Enable development tooling")
grpcServer *grpc.Server
grpcHealthServer *grpc.Server
httpServer *http.Server
)
func main() {
flag.Parse()
logger, err := probe.InitLogger(serviceName, *logLevel)
if err != nil {
stdlog.Fatal("can't init logger", err)
}
level.Info(logger).Log("msg", "Starting app", "version", version)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interrupt)
g, ctx := errgroup.WithContext(ctx)
// web server metrics
g.Go(func() error {
httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", *httpPort),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
level.Info(logger).Log("msg", fmt.Sprintf("HTTP Metrics server serving at :%d", *httpPort))
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
return err
}
return nil
})
// gRPC Health Server
healthServer := health.NewServer()
g.Go(func() error {
grpcHealthServer = grpc.NewServer()
healthpb.RegisterHealthServer(grpcHealthServer, healthServer)
haddr := fmt.Sprintf(":%d", *healthPort)
hln, err := net.Listen("tcp", haddr)
if err != nil {
level.Error(logger).Log("msg", "gRPC Health server: failed to listen", "error", err)
os.Exit(2)
}
level.Info(logger).Log("msg", fmt.Sprintf("gRPC health server serving at %s", haddr))
return grpcHealthServer.Serve(hln)
})
// gRPC server
g.Go(func() error {
addr := fmt.Sprintf(":%d", *grpcPort)
ln, err := net.Listen("tcp", addr)
if err != nil {
level.Error(logger).Log("msg", "gRPC server: failed to listen", "error", err)
os.Exit(2)
}
server := &myservice.Server{
Logger: logger,
Health: healthServer,
ServiceName: serviceName,
}
grpcServer = grpc.NewServer(
// MaxConnectionAge is just to avoid long connection, to facilitate load balancing
// MaxConnectionAgeGrace will torn them, default to infinity
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: 2 * time.Minute}),
grpc.StatsHandler(&ocgrpc_propag.ServerHandler{}),
)
myservice.RegisterNyServiceServer(grpcServer, server)
level.Info(logger).Log("msg", fmt.Sprintf("gRPC server serving at %s", addr))
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_SERVING)
return grpcServer.Serve(ln)
})
select {
case <-interrupt:
break
case <-ctx.Done():
break
}
level.Warn(logger).Log("msg", "received shutdown signal")
cancel()
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_NOT_SERVING)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if httpServer != nil {
_ = httpServer.Shutdown(shutdownCtx)
}
if grpcServer != nil {
grpcServer.GracefulStop()
}
if grpcHealthServer != nil {
grpcHealthServer.GracefulStop()
}
err = g.Wait()
if err != nil {
level.Error(logger).Log("msg", "server returning an error", "error", err)
os.Exit(2)
}
}
@nigeltiany
Copy link

nigeltiany commented Nov 24, 2019

please explain this command: ["/root/grpc_health_probe", "-addr=:6666"]
How does this find its way inside a k8s pod? Is it from a Docker build process? Is it the name of the same app or different binary?

@akhenakh
Copy link
Author

akhenakh commented Nov 24, 2019

Simply put the command in your Dockerfile eg:

FROM gcr.io/distroless/static

WORKDIR /root/
COPY geoipd .
COPY grpc_health_probe .
ENTRYPOINT ["./geoipd"]

readinessProbe will execute the given command, being another image or the same image.

@wtask
Copy link

wtask commented Jan 12, 2020

Why cancel() is called explicitly although there is a deferred call above?

@akhenakh
Copy link
Author

akhenakh commented Jan 12, 2020

We want cancel() to happen early, right after the signal catch.

The defer cancel() is superfluous. Good catch.

@wtask
Copy link

wtask commented Jan 15, 2020

I think it's a redundant operation, as the inner context of errgroup is either already cancelled (case <-ctx.Done() due to error ) or will be cancelled immediately after first shutdown/stop. When context is cancelled outside errgroup-flow the group of "threads" will continue running despite cancellation, because the code inside g.Go() "threads" is not depend on cancelled context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment