Skip to content

Instantly share code, notes, and snippets.

@embano1
Last active April 11, 2024 13:46
Show Gist options
  • Star 26 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save embano1/e0bf49d24f1cdd07cffad93097c04f0a to your computer and use it in GitHub Desktop.
Save embano1/e0bf49d24f1cdd07cffad93097c04f0a to your computer and use it in GitHub Desktop.
gRPC Graceful Shutdown on Client and Server
package main
import (
"context"
"grpc-tutorial/greeter"
"io"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := grpc.DialContext(ctx, "localhost:8080", grpc.WithBlock(), grpc.WithTimeout(3*time.Second), grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect to server: %v", err)
}
defer conn.Close()
cli := greeter.NewGreeterClient(conn)
stream, err := cli.SayHelloStream(ctx, &greeter.HelloRequest{})
if err != nil {
log.Fatalf("could not create streaming client: %v", err)
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case s := <-sigCh:
log.Printf("got signal %v, attempting graceful shutdown", s)
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(5 * time.Second)
resp, err := cli.SayHello(ctx, &empty.Empty{})
if err != nil {
if status.Code(err) == codes.Canceled {
log.Println("context cancelled")
return
}
log.Fatalf("could not perform regular rpc request: %v", err)
}
log.Printf("received SayHello response: %+v", resp)
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
r, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
log.Println("stream closed (context cancelled)")
cancel()
return
}
log.Fatalf("error while receiving stream response: %v", err)
}
log.Printf("received value: %+v", r)
}
}()
wg.Wait()
}
package main
import (
"context"
"fmt"
"grpc-tutorial/greeter"
pb "grpc-tutorial/greeter"
"log"
"math/rand"
"net"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type greeterServer struct {
intCh <-chan int
}
func generate(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
for {
select {
case <-time.Tick(time.Second):
rand.Seed(time.Now().UnixNano())
n := rand.Int()
// log.Printf("generated %d", n)
ch <- n
case <-ctx.Done():
close(ch)
return
}
}
}()
return ch
}
func (g *greeterServer) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
for n := range g.intCh {
resp := pb.HelloReply{
Message: strconv.Itoa(n),
}
if err := stream.Send(&resp); err != nil {
if status.Code(err) == codes.Canceled {
log.Println("stream closed (context cancelled)")
return nil
}
log.Printf("could not send over stream: %v", err)
return err
}
log.Printf("sent %d", n)
}
return nil
}
func (g *greeterServer) SayHello(context.Context, *empty.Empty) (*greeter.HelloReply, error) {
resp := pb.HelloReply{
Message: "this WORKED!",
}
return &resp, nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := generate(ctx)
gSrv := greeterServer{
intCh: ch,
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 8080))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpc := grpc.NewServer()
pb.RegisterGreeterServer(grpc, &gSrv)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s := <-sigCh
log.Printf("got signal %v, attempting graceful shutdown", s)
cancel()
grpc.GracefulStop()
// grpc.Stop() // leads to error while receiving stream response: rpc error: code = Unavailable desc = transport is closing
wg.Done()
}()
log.Println("starting grpc server")
err = grpc.Serve(lis)
if err != nil {
log.Fatalf("could not serve: %v", err)
}
wg.Wait()
log.Println("clean shutdown")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment