package receiver
type ConsumerFunc func(request *nethttp.Request, binding.Message, []binding.TransformerFactory, nethttp.Header) error
type MessageReceiver struct {
consume ConsumerFunc
// logger and httpBindingsReceiver omitted
}
func New(c ConsumerFunc) MessageReceiver {
return Receiver{
consume: c,
}
}
func (r MessageReceiver) Start(ctx context.Context) error {
// ... same as MessageReceiver.Start(ctx)
}
func (r MessageReceiver) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) {
if request.Method != nethttp.MethodPost {
response.WriteHeader(nethttp.StatusMethodNotAllowed)
return
}
// tctx.URI is actually the path...
if request.URL.Path != "/" {
response.WriteHeader(nethttp.StatusNotFound)
return
}
message := http.NewMessageFromHttpRequest(request)
if message.ReadEncoding() == binding.EncodingUnknown {
response.WriteHeader(nethttp.StatusBadRequest)
return
}
transformers := append(defaultTransformers, AddHistory(host)...)
span := trace.FromContext(request.Context())
if span != nil {
transformers = append(transformers, tracing.AddTraceparent(span)...)
}
err = r.consume(request, message, transformers, utils.PassThroughHeaders(request.Header))
if err != nil {
_, unknownChannel := err.(*UnknownChannelError)
_, unknownHost = err.(UnknownHostError)
if unknownChannel || unknownHost {
response.WriteHeader(nethttp.StatusNotFound)
} else {
response.WriteHeader(nethttp.StatusInternalServerError)
}
return
}
response.WriteHeader(nethttp.StatusAccepted)
return
}
Last active
March 30, 2020 09:27
-
-
Save pierDipi/c6685b3d203bb765349cc2a0968b5f12 to your computer and use it in GitHub Desktop.
New NewMessageReceiver()
function for ChannelBasedBroker
:
package channel
func NewMessageReceiver(receiverFunc UnbufferedMessageReceiverFunc, logger *zap.Logger, opts ...MessageReceiverOptions) (receiver.MessageReceiver, error) {
// ... same as before
return receiver.New(func(request *nethttp.Request, message binding.Message, transformers []binding.TransformerFactory, header nethttp.Header) error {
channel, err := r.hostToChannelFunc(request.Host)
if err != nil {
// ...
return err // UnknownHostError || other errors
}
return receiverFunc(request.Context(), channel, message, transformers, header)
})
}
NewMessageReceiver()
for the new broker:
package broker
type UnbufferedMessageReceiverFunc func(context.Context, binding.Message, []binding.TransformerFactory, nethttp.Header) error
func NewMessageReceiver(receiverFunc UnbufferedMessageReceiverFunc, logger *zapLogger) (receiver.Receiver, error) {
return receiver.New(func(request *nethttp.Request, message binding.Message, transformers []binding.TransformerFactory, header nethttp.Header) error {
return receiverFunc(request.Context(), message, transformers, header)
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment