Skip to content

Instantly share code, notes, and snippets.

@Semior001
Created December 27, 2023 07:05
Show Gist options
  • Save Semior001/19b81e0e1fb1972869324957bc7ad188 to your computer and use it in GitHub Desktop.
Save Semior001/19b81e0e1fb1972869324957bc7ad188 to your computer and use it in GitHub Desktop.
package tmiddleware
import (
"context"
"fmt"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/interceptor"
"google.golang.org/grpc/metadata"
"github.com/cappuccinotm/prototemporal/tmiddleware/overengineered"
"regexp"
)
// GRPCMetadataMemoInterceptor memoizes the metadata from the context and appends it to the
// outgoing context.
//
// WARNING: It is used for tracing purposes, not functional! It doesn't replace or work in any way
// with Propagator, but rather stands aside of it, it just duplicates incoming and outgoing
// metadata to the memo, which have no functional, but only documentary use!
//
// Note: it has no use as workflow client encodes the memo into base64 and this behavior is
// unchangeable by the moment:
// https://github.com/temporalio/sdk-go/blob/master/internal/internal_workflow_client.go#L1440-L1444
func GRPCMetadataMemoInterceptor(rx *regexp.Regexp) interceptor.ClientInterceptor {
return overengineered.ExecuteWorkflowOutboundMiddleware(func(next overengineered.ExecuteWorkflowFunc) overengineered.ExecuteWorkflowFunc {
return func(ctx context.Context, in *interceptor.ClientExecuteWorkflowInput) (client.WorkflowRun, error) {
setByMetadata := func(key string, md metadata.MD) error {
outMD := metadata.MD{}
for k, v := range md {
if !rx.MatchString(k) {
continue
}
if len(v) == 0 {
continue
}
outMD[k] = v
}
if in.Options.Memo == nil {
in.Options.Memo = map[string]interface{}{}
}
in.Options.Memo[key] = outMD
return nil
}
md, _ := metadata.FromIncomingContext(ctx)
if err := setByMetadata("grpc_incoming_metadata", md); err != nil {
return nil, fmt.Errorf("set incoming: %w", err)
}
md, _ = metadata.FromOutgoingContext(ctx)
if err := setByMetadata("grpc_outgoing_metadata", md); err != nil {
return nil, fmt.Errorf("set outgoing: %w", err)
}
return next(ctx, in)
}
})
}
package overengineered
import (
"context"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/interceptor"
)
// ExecuteWorkflowOutboundMiddleware is a shorthand to make simpler client interceptor.
func ExecuteWorkflowOutboundMiddleware(fn func(next ExecuteWorkflowFunc) ExecuteWorkflowFunc) interceptor.ClientInterceptor {
return &embeddingClientInterceptor{
COI: func(next interceptor.ClientOutboundInterceptor) interceptor.ClientOutboundInterceptor {
return &executeWorkflowInterceptor{
ClientOutboundInterceptorBase: interceptor.ClientOutboundInterceptorBase{Next: next},
fn: fn,
}
},
}
}
// embeddingClientInterceptor just implements the interceptor.Interceptor interface to pass
// intercepting calls to the COI.
type embeddingClientInterceptor struct {
COI func(next interceptor.ClientOutboundInterceptor) interceptor.ClientOutboundInterceptor
interceptor.InterceptorBase
}
// InterceptClient implements the interceptor.ClientInterceptor interface.
func (i embeddingClientInterceptor) InterceptClient(
next interceptor.ClientOutboundInterceptor,
) interceptor.ClientOutboundInterceptor {
return i.COI(next)
}
type executeWorkflowInterceptor struct {
fn func(next ExecuteWorkflowFunc) ExecuteWorkflowFunc
interceptor.ClientOutboundInterceptorBase
}
func (c *executeWorkflowInterceptor) ExecuteWorkflow(
ctx context.Context,
in *interceptor.ClientExecuteWorkflowInput,
) (client.WorkflowRun, error) {
return c.fn(c.Next.ExecuteWorkflow)(ctx, in)
}
// ExecuteWorkflowFunc is a function type that can be used to execute a workflow.
type ExecuteWorkflowFunc func(
ctx context.Context,
in *interceptor.ClientExecuteWorkflowInput,
) (client.WorkflowRun, error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment