Skip to content

Instantly share code, notes, and snippets.

@shijuvar
Last active October 14, 2017 13:11
Show Gist options
  • Save shijuvar/c16532e3a8ded030da793846d3f4b4a9 to your computer and use it in GitHub Desktop.
Save shijuvar/c16532e3a8ded030da793846d3f4b4a9 to your computer and use it in GitHub Desktop.
A NATS Streaming client that subscribes messages with a QueueGroup
package main
import (
"encoding/json"
"log"
"runtime"
stan "github.com/nats-io/go-nats-streaming"
"github.com/shijuvar/gokit/examples/nats-streaming/pb"
"github.com/shijuvar/gokit/examples/nats-streaming/store"
)
const (
clusterID = "test-cluster"
clientID = "order-query-store1"
channel = "order-notification"
durableID = "store-durable"
queueGroup = "order-query-store-group"
)
func main() {
// Connect to NATS Streaming server
sc, err := stan.Connect(
clusterID,
clientID,
stan.NatsURL(stan.DefaultNatsURL),
)
if err != nil {
log.Fatal(err)
}
sc.QueueSubscribe(channel, queueGroup, func(msg *stan.Msg) {
order := pb.Order{}
err := json.Unmarshal(msg.Data, &order)
if err == nil {
// Handle the message
log.Printf("Subscribed message from clientID - %s: %+v\n", clientID, order)
queryStore := store.QueryStore{}
// Perform data replication for query model into CockroachDB
err := queryStore.SyncOrderQueryModel(order)
if err != nil {
log.Printf("Error while replicating the query model %+v", err)
}
}
}, stan.DurableName(durableID),
)
runtime.Goexit()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment