Skip to content

Instantly share code, notes, and snippets.

@carlware
Last active April 25, 2019 00:44
Show Gist options
  • Save carlware/e33e0691853b5f3e4d5c04ae452926c3 to your computer and use it in GitHub Desktop.
Save carlware/e33e0691853b5f3e4d5c04ae452926c3 to your computer and use it in GitHub Desktop.
unary and stream interceptors in go
s := grpc.NewServer(
grpc.StreamInterceptor(
grpc_middleware.ChainStreamServer(
GetStreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor)),
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
GetUnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor)))
func SpanFromMetadata(name string, ctx context.Context) (*trace.Span, bool) {
if md, ok := metadata.FromContext(ctx); ok {
traceContext, ok := md["x-cloud-trace-context"]
if ok && len(traceContext) > 0 {
s := trace.SpanFromHeader(name, traceContext[0])
return s, true
}
}
return traceClient.NewSpan(name), false
}
func GetUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
methodName := fmt.Sprintf("server:%s", info.FullMethod)
span, ok := SpanFromMetadata(methodName, ctx)
if !ok {
span = trace.NewSpan(methodName)
}
newCtx := trace.NewContext(ctx, span)
result, err := handler(newCtx, req)
if err != nil {
span.SetLabel("error", err.Error())
}
span.Finish()
return result, err
}
}
func GetStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
methodName := fmt.Sprintf("server:%s", info.FullMethod)
span, ok := SpanFromMetadata(methodName, ss.Context())
if !ok {
span = trace.NewSpan(methodName)
}
//TODO Stream context cannot be modified. We should add trace data through the SetHeader method
err := handler(srv, ss)
if err != nil {
span.SetLabel("error", err.Error())
}
span.Finish()
return err
}
}
func GetUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
methodName := fmt.Sprintf("client:%s", method)
span := trace.FromContext(ctx).NewChild(methodName)
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
span.SetLabel("error", err.Error())
}
span.Finish()
return err
}
}
func GetStreamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
methodName := fmt.Sprintf("client:%s", method)
span := trace.FromContext(ctx).NewChild(methodName)
cs, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
span.SetLabel("error", err.Error())
}
span.Finish()
return cs, err
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment