Skip to content

Instantly share code, notes, and snippets.

@chlp
Created January 23, 2022 10:14
Show Gist options
  • Save chlp/9c6ffed0a3462c94722e54f5c417fa08 to your computer and use it in GitHub Desktop.
Save chlp/9c6ffed0a3462c94722e54f5c417fa08 to your computer and use it in GitHub Desktop.
nats simple reader
package main
import (
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"os"
"time"
)
func main() {
subscribe("nats-url-1.ru,nats-url-2.ru", "nats-cluster", "topic")
subscribe("nats-url-1.ru,nats-url-2.ru", "nats-cluster", "topic2")
time.Sleep(48 * time.Hour)
}
func subscribe(url string, cluster string, subject string) {
clientId := "aleksei-rytikov-test-" + subject
ncOpts := []nats.Option{
nats.Name(clientId),
nats.MaxReconnects(-1),
nats.ReconnectBufSize(-1),
nats.UserInfo("login", "password"),
}
nc, err := nats.Connect(url, ncOpts...)
if err != nil {
println("nats connect err, url", url, "cluster", cluster, err.Error())
os.Exit(1)
}
sc, err := stan.Connect(cluster, clientId, stan.NatsConn(nc))
go func() {
for {
println("Check status. Url: ", url, ". Cluster: ", cluster, ". Subject: ", subject, ". Status: ", sc.NatsConn().Status())
time.Sleep(60 * time.Second)
}
}()
if err != nil {
println("stan connect err, url", url, "cluster", cluster, err.Error())
os.Exit(1)
}
_, err = sc.Subscribe(subject, func(m *stan.Msg) {
println("Message. Url: ", url, ". Cluster: ", cluster, ". Subject: ", subject, ". Timestamp: ", m.Timestamp, ". Data: ", string(m.Data))
}, stan.DurableName(clientId))
if err != nil {
println("subs err, url", url, "cluster", cluster, err.Error())
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment