Skip to content

Instantly share code, notes, and snippets.

@Ja7ad
Created October 10, 2023 08:53
Show Gist options
  • Save Ja7ad/868d14a8b58cd97bf4e63c19d5112995 to your computer and use it in GitHub Desktop.
Save Ja7ad/868d14a8b58cd97bf4e63c19d5112995 to your computer and use it in GitHub Desktop.
decorated jetstream.Msg for NakWithDynamicDelay
package nats
import (
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/nats-io/nats.go/jetstream"
"time"
)
type Msg interface {
jetstream.Msg
// NakWithDynamicDelay dynamic delay of naked massage step by step max 4 min
NakWithDynamicDelay() error
}
type Message struct {
jetstream.Msg
cache *expirable.LRU[string, uint8]
}
func newCustomMsg(msg jetstream.Msg, size int, ttl time.Duration) Msg {
return &Message{
cache: expirable.NewLRU[string, uint8](size, nil, ttl),
Msg: msg,
}
}
func (m *Message) NakWithDynamicDelay() error {
var duration uint8
msgId := m.Headers().Get("Nats-Msg-Id")
cacheKey := msgId[:8]
v, _ := m.cache.Get(cacheKey)
if v == 254 {
duration = v
} else if v < 254 {
duration = v + 2
}
m.cache.Add(cacheKey, duration)
return m.NakWithDelay(time.Duration(duration) * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment