Skip to content

Instantly share code, notes, and snippets.

@pierDipi
Last active March 30, 2020 09:27
Show Gist options
  • Save pierDipi/c6685b3d203bb765349cc2a0968b5f12 to your computer and use it in GitHub Desktop.
Save pierDipi/c6685b3d203bb765349cc2a0968b5f12 to your computer and use it in GitHub Desktop.
package receiver

type ConsumerFunc func(request *nethttp.Request, binding.Message, []binding.TransformerFactory, nethttp.Header) error

type MessageReceiver struct {
  consume  ConsumerFunc
  
  // logger and httpBindingsReceiver omitted
}

func New(c ConsumerFunc) MessageReceiver {
  return Receiver{
    consume: c,
  }
}

func (r MessageReceiver) Start(ctx context.Context) error {
  // ... same as MessageReceiver.Start(ctx)
}

func (r MessageReceiver) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) {
	if request.Method != nethttp.MethodPost {
		response.WriteHeader(nethttp.StatusMethodNotAllowed)
		return
	}

	// tctx.URI is actually the path...
	if request.URL.Path != "/" {
		response.WriteHeader(nethttp.StatusNotFound)
		return
	}
  
	message := http.NewMessageFromHttpRequest(request)
  
	if message.ReadEncoding() == binding.EncodingUnknown {
		response.WriteHeader(nethttp.StatusBadRequest)
		return
	}

	transformers := append(defaultTransformers, AddHistory(host)...)
	span := trace.FromContext(request.Context())
	if span != nil {
		transformers = append(transformers, tracing.AddTraceparent(span)...)
	}

	err = r.consume(request, message, transformers, utils.PassThroughHeaders(request.Header))
	if err != nil {
  		_, unknownChannel := err.(*UnknownChannelError)
 		_, unknownHost = err.(UnknownHostError)
		if unknownChannel || unknownHost {
    			response.WriteHeader(nethttp.StatusNotFound)
    		} else {
			response.WriteHeader(nethttp.StatusInternalServerError)
		}
		return
	}

	response.WriteHeader(nethttp.StatusAccepted)
	return
}

New NewMessageReceiver() function for ChannelBasedBroker:

package channel

func NewMessageReceiver(receiverFunc UnbufferedMessageReceiverFunc, logger *zap.Logger, opts ...MessageReceiverOptions) (receiver.MessageReceiver, error) {
    // ... same as before
    
    return receiver.New(func(request *nethttp.Request, message binding.Message, transformers []binding.TransformerFactory, header nethttp.Header) error {
        channel, err := r.hostToChannelFunc(request.Host)
        if err != nil {
            // ...
            return err // UnknownHostError || other errors
        }
        
        return receiverFunc(request.Context(), channel, message, transformers, header)
    })
}

NewMessageReceiver() for the new broker:

package broker

type UnbufferedMessageReceiverFunc func(context.Context, binding.Message, []binding.TransformerFactory, nethttp.Header) error 

func NewMessageReceiver(receiverFunc UnbufferedMessageReceiverFunc, logger *zapLogger) (receiver.Receiver, error) {
  return receiver.New(func(request *nethttp.Request, message binding.Message, transformers []binding.TransformerFactory, header nethttp.Header) error {
    return receiverFunc(request.Context(), message, transformers, header)
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment