Skip to content

Instantly share code, notes, and snippets.

@Issif
Last active February 26, 2024 14:01
Show Gist options
  • Save Issif/7607f63fcd934149bc00060d47468bb4 to your computer and use it in GitHub Desktop.
Save Issif/7607f63fcd934149bc00060d47468bb4 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/nats-io/nats.go"
natsserver "github.com/nats-io/nats-server/v2/server"
)
const (
StreamName = "EVENTS"
StreamSubjects = "EVENTS.*"
)
func main() {
log.Println("Starting...")
ns, err := natsserver.NewServer(
&natsserver.Options{
JetStream: true,
Debug: true,
})
if err != nil {
log.Fatal(err)
}
defer ns.Shutdown()
go ns.Start()
if !ns.ReadyForConnections(3 * time.Second) {
log.Fatal("NATS server not ready")
}
js, err := jetStreamInit()
if err != nil {
log.Println(err)
return
}
// Let's assume that publisher and consumer are services running on different servers.
// So run publisher and consumer asynchronously to see how it works
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
publishEVENTS(js)
}()
wg.Add(1)
go func() {
defer wg.Done()
consumeEVENTS(js)
}()
wg.Wait()
log.Println("Exit...")
}
func publishEVENTS(js nats.JetStreamContext) {
for i := 0; i < 1000; i++ {
if _, err := js.Publish(StreamName+".test", []byte("test"), nats.MsgId("test")); err != nil {
log.Println(err)
}
time.Sleep(100 * time.Millisecond)
}
}
func consumeEVENTS(js nats.JetStreamContext) {
_, err := js.Subscribe(StreamSubjects, func(m *nats.Msg) {
if err := m.Ack(); err != nil {
log.Println("Unable to Ack", err)
return
}
log.Printf("Consumer => Subject: %s - Message: %s\n", m.Subject, m.Data)
})
if err != nil {
log.Println("Subscribe failed")
return
}
}
func jetStreamInit() (nats.JetStreamContext, error) {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return nil, err
}
// Create JetStream Context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
return nil, err
}
// Create a stream if it does not exist
err = createStream(js)
if err != nil {
return nil, err
}
return js, nil
}
func createStream(jetStream nats.JetStreamContext) error {
stream, err := jetStream.StreamInfo(StreamName)
if err != nil {
if err != nats.ErrStreamNotFound {
log.Fatalf("Unexpected error: %v", err)
}
}
if stream == nil {
log.Printf("Creating stream: %s\n", StreamName)
_, err = jetStream.AddStream(&nats.StreamConfig{
Name: StreamName,
Subjects: []string{StreamSubjects},
Duplicates: 5 * time.Second,
MaxAge: 10 * time.Second,
MaxMsgsPerSubject: 1,
Storage: nats.MemoryStorage,
})
if err != nil {
return err
}
}
info, err := jetStream.StreamInfo(StreamName)
if err != nil {
log.Fatalf("Unexpected error: %v", err)
}
fmt.Printf("Stream info: %+v\n", info)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment