Skip to content

Instantly share code, notes, and snippets.

@glycerine
Forked from joshrotenberg/one_at_a_time.go
Created October 1, 2015 00:11
Show Gist options
  • Save glycerine/cbdd58e889b8805a7101 to your computer and use it in GitHub Desktop.
Save glycerine/cbdd58e889b8805a7101 to your computer and use it in GitHub Desktop.
package main
// simple example to show how to process one message at a time with nsq using the go-nsq client library.
// see config stuff in var below to play around with different scenarios.
import (
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/nsqio/go-nsq"
)
var (
msgTimeout = time.Second * 7 // how long to wait for a message to be process
jobRunTime = time.Second * 6 // fake time it takes to process a message
numMessages = 10 // how many messages to publish
numConsumers = 10 // number of individual consumers
nsqdTCPAddrs = []string{"localhost:4150"}
topic = "t"
channel = "c"
)
type JobHandler struct {
consumerId int
consumer *nsq.Consumer
timeToRun time.Duration
}
func (jh *JobHandler) HandleMessage(m *nsq.Message) error {
// key line here ... we'll tell the other end when we are done with this message
m.DisableAutoResponse()
s := string(m.Body)
time.Sleep(jh.timeToRun) // do some work. if this takes longer than our MsgTimeout, though, we have a problem
log.Printf("consumer %d: finished processing %s", jh.consumerId, s)
// ok, all finished processing. stop the consumer and let the nsqd know
jh.consumer.Stop()
m.Finish()
return nil
}
func main() {
stopChan := make(chan bool)
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
// produce some messages
go func() {
producer, err := nsq.NewProducer(nsqdTCPAddrs[0], nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
bs := make([][]byte, 0)
for i := 1; i <= numMessages; i++ {
b := []byte(strconv.Itoa(i))
bs = append(bs, b)
}
err = producer.MultiPublish(topic, bs)
log.Println("producer: published", len(bs), "messages")
if err != nil {
log.Fatal(err)
}
}()
// fire up some consumers to process the messages
go func() {
for i := 1; i <= numConsumers; i++ {
cfg := nsq.NewConfig()
cfg.MaxInFlight = 1
cfg.MsgTimeout = msgTimeout
consumer, err := nsq.NewConsumer(topic, channel, cfg)
if err != nil {
log.Fatal(err)
}
consumer.AddHandler(&JobHandler{timeToRun: jobRunTime, consumer: consumer, consumerId: i})
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
log.Fatal(err)
}
}
}()
// hang around so stuff can happen
select {
case <-termChan:
case <-stopChan:
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment