Skip to content

Instantly share code, notes, and snippets.

@koblas
Created July 19, 2024 17:26
Show Gist options
  • Save koblas/26ad55243ec98dcdd4fcdd67a22df54c to your computer and use it in GitHub Desktop.
Save koblas/26ad55243ec98dcdd4fcdd67a22df54c to your computer and use it in GitHub Desktop.
func natsToHTTP(msg jetstream.Msg) *http.Request {
hdrs := msg.Headers()
request := http.Request{
URL: &url.URL{
Scheme: "queue",
Path: hdrs.Get(jetURLPath),
},
Method: http.MethodPost,
Header: http.Header(hdrs),
Body: io.NopCloser(bytes.NewReader(msg.Data())),
}
return &request
}
func messageProcessor(ctx context.Context, mux *http.ServeMux, worker chan jetstream.Msg) {
log := logkit.FromContext(ctx)
for {
select {
case <-ctx.Done():
return
case msg := <-worker:
req := natsToHTTP(msg)
w := httptest.NewRecorder()
mux.ServeHTTP(w, req.WithContext(ctx))
res := w.Result()
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
buf, _ := io.ReadAll(io.LimitReader(res.Body, 1024))
log.With("statusCode", res.StatusCode).With("statusMsg", string(buf)).Info("nats consumer error")
}
_ = msg.Ack()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment