Created
December 27, 2023 07:05
-
-
Save Semior001/19b81e0e1fb1972869324957bc7ad188 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tmiddleware | |
import ( | |
"context" | |
"fmt" | |
"go.temporal.io/sdk/client" | |
"go.temporal.io/sdk/interceptor" | |
"google.golang.org/grpc/metadata" | |
"github.com/cappuccinotm/prototemporal/tmiddleware/overengineered" | |
"regexp" | |
) | |
// GRPCMetadataMemoInterceptor memoizes the metadata from the context and appends it to the | |
// outgoing context. | |
// | |
// WARNING: It is used for tracing purposes, not functional! It doesn't replace or work in any way | |
// with Propagator, but rather stands aside of it, it just duplicates incoming and outgoing | |
// metadata to the memo, which have no functional, but only documentary use! | |
// | |
// Note: it has no use as workflow client encodes the memo into base64 and this behavior is | |
// unchangeable by the moment: | |
// https://github.com/temporalio/sdk-go/blob/master/internal/internal_workflow_client.go#L1440-L1444 | |
func GRPCMetadataMemoInterceptor(rx *regexp.Regexp) interceptor.ClientInterceptor { | |
return overengineered.ExecuteWorkflowOutboundMiddleware(func(next overengineered.ExecuteWorkflowFunc) overengineered.ExecuteWorkflowFunc { | |
return func(ctx context.Context, in *interceptor.ClientExecuteWorkflowInput) (client.WorkflowRun, error) { | |
setByMetadata := func(key string, md metadata.MD) error { | |
outMD := metadata.MD{} | |
for k, v := range md { | |
if !rx.MatchString(k) { | |
continue | |
} | |
if len(v) == 0 { | |
continue | |
} | |
outMD[k] = v | |
} | |
if in.Options.Memo == nil { | |
in.Options.Memo = map[string]interface{}{} | |
} | |
in.Options.Memo[key] = outMD | |
return nil | |
} | |
md, _ := metadata.FromIncomingContext(ctx) | |
if err := setByMetadata("grpc_incoming_metadata", md); err != nil { | |
return nil, fmt.Errorf("set incoming: %w", err) | |
} | |
md, _ = metadata.FromOutgoingContext(ctx) | |
if err := setByMetadata("grpc_outgoing_metadata", md); err != nil { | |
return nil, fmt.Errorf("set outgoing: %w", err) | |
} | |
return next(ctx, in) | |
} | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package overengineered | |
import ( | |
"context" | |
"go.temporal.io/sdk/client" | |
"go.temporal.io/sdk/interceptor" | |
) | |
// ExecuteWorkflowOutboundMiddleware is a shorthand to make simpler client interceptor. | |
func ExecuteWorkflowOutboundMiddleware(fn func(next ExecuteWorkflowFunc) ExecuteWorkflowFunc) interceptor.ClientInterceptor { | |
return &embeddingClientInterceptor{ | |
COI: func(next interceptor.ClientOutboundInterceptor) interceptor.ClientOutboundInterceptor { | |
return &executeWorkflowInterceptor{ | |
ClientOutboundInterceptorBase: interceptor.ClientOutboundInterceptorBase{Next: next}, | |
fn: fn, | |
} | |
}, | |
} | |
} | |
// embeddingClientInterceptor just implements the interceptor.Interceptor interface to pass | |
// intercepting calls to the COI. | |
type embeddingClientInterceptor struct { | |
COI func(next interceptor.ClientOutboundInterceptor) interceptor.ClientOutboundInterceptor | |
interceptor.InterceptorBase | |
} | |
// InterceptClient implements the interceptor.ClientInterceptor interface. | |
func (i embeddingClientInterceptor) InterceptClient( | |
next interceptor.ClientOutboundInterceptor, | |
) interceptor.ClientOutboundInterceptor { | |
return i.COI(next) | |
} | |
type executeWorkflowInterceptor struct { | |
fn func(next ExecuteWorkflowFunc) ExecuteWorkflowFunc | |
interceptor.ClientOutboundInterceptorBase | |
} | |
func (c *executeWorkflowInterceptor) ExecuteWorkflow( | |
ctx context.Context, | |
in *interceptor.ClientExecuteWorkflowInput, | |
) (client.WorkflowRun, error) { | |
return c.fn(c.Next.ExecuteWorkflow)(ctx, in) | |
} | |
// ExecuteWorkflowFunc is a function type that can be used to execute a workflow. | |
type ExecuteWorkflowFunc func( | |
ctx context.Context, | |
in *interceptor.ClientExecuteWorkflowInput, | |
) (client.WorkflowRun, error) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment