Last active
February 26, 2024 14:01
-
-
Save Issif/7607f63fcd934149bc00060d47468bb4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
"github.com/nats-io/nats.go" | |
natsserver "github.com/nats-io/nats-server/v2/server" | |
) | |
const ( | |
StreamName = "EVENTS" | |
StreamSubjects = "EVENTS.*" | |
) | |
func main() { | |
log.Println("Starting...") | |
ns, err := natsserver.NewServer( | |
&natsserver.Options{ | |
JetStream: true, | |
Debug: true, | |
}) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer ns.Shutdown() | |
go ns.Start() | |
if !ns.ReadyForConnections(3 * time.Second) { | |
log.Fatal("NATS server not ready") | |
} | |
js, err := jetStreamInit() | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
// Let's assume that publisher and consumer are services running on different servers. | |
// So run publisher and consumer asynchronously to see how it works | |
var wg sync.WaitGroup | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
publishEVENTS(js) | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
consumeEVENTS(js) | |
}() | |
wg.Wait() | |
log.Println("Exit...") | |
} | |
func publishEVENTS(js nats.JetStreamContext) { | |
for i := 0; i < 1000; i++ { | |
if _, err := js.Publish(StreamName+".test", []byte("test"), nats.MsgId("test")); err != nil { | |
log.Println(err) | |
} | |
time.Sleep(100 * time.Millisecond) | |
} | |
} | |
func consumeEVENTS(js nats.JetStreamContext) { | |
_, err := js.Subscribe(StreamSubjects, func(m *nats.Msg) { | |
if err := m.Ack(); err != nil { | |
log.Println("Unable to Ack", err) | |
return | |
} | |
log.Printf("Consumer => Subject: %s - Message: %s\n", m.Subject, m.Data) | |
}) | |
if err != nil { | |
log.Println("Subscribe failed") | |
return | |
} | |
} | |
func jetStreamInit() (nats.JetStreamContext, error) { | |
// Connect to NATS | |
nc, err := nats.Connect(nats.DefaultURL) | |
if err != nil { | |
return nil, err | |
} | |
// Create JetStream Context | |
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) | |
if err != nil { | |
return nil, err | |
} | |
// Create a stream if it does not exist | |
err = createStream(js) | |
if err != nil { | |
return nil, err | |
} | |
return js, nil | |
} | |
func createStream(jetStream nats.JetStreamContext) error { | |
stream, err := jetStream.StreamInfo(StreamName) | |
if err != nil { | |
if err != nats.ErrStreamNotFound { | |
log.Fatalf("Unexpected error: %v", err) | |
} | |
} | |
if stream == nil { | |
log.Printf("Creating stream: %s\n", StreamName) | |
_, err = jetStream.AddStream(&nats.StreamConfig{ | |
Name: StreamName, | |
Subjects: []string{StreamSubjects}, | |
Duplicates: 5 * time.Second, | |
MaxAge: 10 * time.Second, | |
MaxMsgsPerSubject: 1, | |
Storage: nats.MemoryStorage, | |
}) | |
if err != nil { | |
return err | |
} | |
} | |
info, err := jetStream.StreamInfo(StreamName) | |
if err != nil { | |
log.Fatalf("Unexpected error: %v", err) | |
} | |
fmt.Printf("Stream info: %+v\n", info) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment