Skip to content

Instantly share code, notes, and snippets.

@Aneesh540
Created September 27, 2021 17:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aneesh540/76ef6e8485a518f7ddfff4589a6ce94f to your computer and use it in GitHub Desktop.
Save Aneesh540/76ef6e8485a518f7ddfff4589a6ce94f to your computer and use it in GitHub Desktop.
package main
import (
nsq "github.com/nsqio/go-nsq"
)
var consumerChannel chan interface
func AddMongoBulk(data){
// your code for preprocessing and adding new value in mongoDB in bulk
}
var handleMessage = func(msg *nsq.Message) error {
var data interface{}
_ := json.Unmarshal(msg.Body, &data)
consumerChannel <- data
return nil
}
func consumeMessage(){
interval := time.NewTicker(time.Second * 10) // ticker of every 10 second
threshold := 1000
bulkArray = make([]interface, 0)
for {
select {
case <- interval.C:
AddMongoBulk(bulkArray)
bulkArray = nil
case msg := <-consumerChannel:
bulkArray = append(bulkArray, msg)
if len(bulkArray) >= threshold { // pushing 1000 messages to MongoDB at a time
AddMongoBulk(bulkArray)
bulkArray = nil
}
}
}
}
func InitNsqConsumer(){
consumerChannel = make(chan interface, 1) // channel of buffer 1, if buffer = 0, it will be a blocking channel
go consumeMessage()
// Below code is same as Method #1(inefficient) solution
nsqTopic := "dummy_topic"
nsqChannel := "dummy_channel"
consumer, err := nsq.NewConsumer(nsqTopic, nsqChannel, nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil{
log.Print("error connecting to nsqlookupd")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment