Skip to content

Instantly share code, notes, and snippets.

@Ja7ad
Last active October 10, 2023 08:54
Show Gist options
  • Save Ja7ad/2d8066dd3e01245f71e8c116aeb0d9fc to your computer and use it in GitHub Desktop.
Save Ja7ad/2d8066dd3e01245f71e8c116aeb0d9fc to your computer and use it in GitHub Desktop.
nats nak with dynamic delay (step by step)
package main
import (
"context"
"encoding/json"
"errors"
"github.com/brianvoe/gofakeit/v6"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/lithammer/shortuuid/v3"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"log"
"os"
"time"
)
const (
subject = "test.foo"
)
type Broker interface {
Publish(ctx context.Context, subject string, payload []byte) error
Consume(errCh chan<- error)
NackWithDynamicDelay(msgId string, nakFunc func(delay time.Duration) error) error
}
type Nats struct {
js jetstream.JetStream
consumer jetstream.Consumer
cache *expirable.LRU[string, uint8]
}
type Person struct {
Id uint8 `json:"id"`
Name string `json:"name"`
Age uint8 `json:"age"`
}
func main() {
numOfMsg := 5
errCh := make(chan error)
ns, err := New(context.Background(),
os.Getenv("NATS_URL"),
os.Getenv("NATS_USERNAME"),
os.Getenv("NATS_PASSWORD"),
)
if err != nil {
log.Fatal(err)
}
for i := 0; i < numOfMsg; i++ {
go func() {
b, err := json.Marshal(&Person{
Id: gofakeit.Uint8(),
Name: gofakeit.Name(),
Age: gofakeit.Uint8(),
})
if err != nil {
log.Fatal(err)
}
if err := ns.Publish(context.Background(), subject, b); err != nil {
log.Fatal(err)
}
}()
}
go ns.Consume(errCh)
if err := <-errCh; err != nil {
log.Fatal(err)
}
}
func New(ctx context.Context, url, username, password string) (Broker, error) {
ns := new(Nats)
cli, err := nats.Connect(
url,
nats.UserInfo(username, password),
nats.MaxReconnects(1000),
nats.RetryOnFailedConnect(true),
nats.ReconnectWait(3*time.Second))
if err != nil {
return nil, err
}
if !cli.IsConnected() {
return nil, errors.New("nats is not connected")
}
js, err := jetstream.New(cli)
if err != nil {
return nil, err
}
ns.js = js
ns.cache = expirable.NewLRU[string, uint8](100, nil, 30*time.Minute)
s, err := ns.js.CreateStream(ctx, jetstream.StreamConfig{
Name: "test",
Subjects: []string{"test.*"},
Retention: jetstream.WorkQueuePolicy,
Storage: jetstream.FileStorage,
})
if err != nil {
return nil, err
}
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "local",
Durable: "local",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
return nil, err
}
ns.consumer = c
return ns, nil
}
func (n *Nats) Publish(ctx context.Context, subject string, payload []byte) error {
header := make(nats.Header)
header.Set("Nats-Msg-Id", shortuuid.New())
_, err := n.js.PublishMsg(ctx, &nats.Msg{
Subject: subject,
Data: payload,
Header: header,
})
return err
}
func (n *Nats) Consume(errCh chan<- error) {
msgCtx, err := n.consumer.Messages()
if err != nil {
errCh <- err
return
}
for {
msg, err := msgCtx.Next()
if err != nil {
errCh <- err
return
}
msgId := msg.Headers().Get("Nats-Msg-Id")
if msg.Subject() != subject {
continue
}
var person Person
if err := json.Unmarshal(msg.Data(), &person); err != nil {
errCh <- err
return
}
if err := printPerson(person); err != nil {
if err := n.NackWithDynamicDelay(msgId, msg.NakWithDelay); err != nil {
errCh <- err
}
continue
}
msg.Ack()
}
}
func (n *Nats) NackWithDynamicDelay(msgId string, nakFunc func(delay time.Duration) error) error {
var duration uint8
v, _ := n.cache.Get(msgId)
if v == 254 {
duration = v
} else if v < 254 {
duration = v + 2
}
n.cache.Add(msgId, duration)
return nakFunc(time.Duration(duration) * time.Second)
}
func printPerson(person Person) error {
log.Println(person)
return errors.New("example err")
}
// decorated jetstream.Msg for NakWithDynamicDelay
package nats
import (
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/nats-io/nats.go/jetstream"
"time"
)
type Msg interface {
jetstream.Msg
// NakWithDynamicDelay dynamic delay of naked massage step by step max 4 min
NakWithDynamicDelay() error
}
type Message struct {
jetstream.Msg
cache *expirable.LRU[string, uint8]
}
func newCustomMsg(msg jetstream.Msg, size int, ttl time.Duration) Msg {
return &Message{
cache: expirable.NewLRU[string, uint8](size, nil, ttl),
Msg: msg,
}
}
func (m *Message) NakWithDynamicDelay() error {
var duration uint8
msgId := m.Headers().Get("Nats-Msg-Id")
cacheKey := msgId[:8]
v, _ := m.cache.Get(cacheKey)
if v == 254 {
duration = v
} else if v < 254 {
duration = v + 2
}
m.cache.Add(cacheKey, duration)
return m.NakWithDelay(time.Duration(duration) * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment