Skip to content

Instantly share code, notes, and snippets.

@piotrpio

piotrpio/main.go Secret

Last active January 31, 2023 11:44
Show Gist options
  • Save piotrpio/9f023c71a317d446acfc817352cc7e6a to your computer and use it in GitHub Desktop.
Save piotrpio/9f023c71a317d446acfc817352cc7e6a to your computer and use it in GitHub Desktop.
package main
import (
"errors"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Flush()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// check if stream exists
_, err = js.StreamInfo("audit-stream")
if err != nil {
if errors.Is(err, nats.ErrStreamNotFound) {
// Create a test stream if it doesn't exist
_, err = js.AddStream(&nats.StreamConfig{
Name: "audit-stream",
Storage: nats.FileStorage,
Subjects: []string{"audit", "logs"},
})
if err != nil {
log.Println(err)
}
fmt.Println("New Stream created")
} else {
log.Println(err)
return
}
} else {
fmt.Println("Stream already exists")
}
sub, err := js.PullSubscribe("logs", "logs-consumer")
if err != nil {
log.Println("logs", ":", err)
return
}
for {
msgs, err := sub.Fetch(1000)
if err != nil {
fmt.Println(err)
continue
}
for _, m := range msgs {
// ... doing stuff with the message
fmt.Println("msg: ", string(m.Data))
if err := m.Ack(); err != nil {
log.Println("logs", ": ", err)
continue
}
}
// pause for some time between fetches
time.Sleep(10 * time.Second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment