Skip to content

Instantly share code, notes, and snippets.

@gatspy
Forked from ebenoist/consumer.go
Created March 1, 2020 16:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gatspy/04e2c5c054cefeaf277a56618bde7a99 to your computer and use it in GitHub Desktop.
Save gatspy/04e2c5c054cefeaf277a56618bde7a99 to your computer and use it in GitHub Desktop.
nsq consumer
package main
import (
"errors"
"log"
"os"
"os/signal"
"syscall"
"github.com/nsqio/go-nsq"
)
// NoopNSQLogger allows us to pipe NSQ logs to dev/null
// The default NSQ logger is great for debugging, but did
// not fit our normally well structured JSON logs. Luckily
// NSQ provides a simple interface for injecting your own
// logger.
type NoopNSQLogger struct{}
// Output allows us to implement the nsq.Logger interface
func (l *NoopNSQLogger) Output(calldepth int, s string) error {
return nil
}
// MessageHandler adheres to the nsq.Handler interface.
// This allows us to define our own custome handlers for
// our messages. Think of these handlers much like you would
// an http handler.
type MessageHandler struct{}
// HandleMessage is the only requirement needed to fulfill the
// nsq.Handler interface. This where you'll write your message
// handling logic.
func (h *MessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// returning an error results in the message being re-enqueued
// a REQ is sent to nsqd
return errors.New("body is blank re-enqueue message")
}
// Let's log our message!
log.Print(m.Body)
// Returning nil signals to the consumer that the message has
// been handled with success. A FIN is sent to nsqd
return nil
}
func main() {
// The default config settings provide a pretty good starting point for
// our new consumer.
config := nsq.NewConfig()
// Create a NewConsumer with the name of our topic, the channel, and our config
consumer, err := nsq.NewConsumer("clicks", "metrics", config)
if err != nil {
log.Fatal(err) // If there is an error, halt the application
}
// Set the number of messages that can be in flight at any given time
// you'll want to set this number as the default is only 1. This can be
// a major concurrency knob that can change the performance of your application.
consumer.ChangeMaxInFlight(200)
// Here we set the logger to our NoopNSQLogger to quiet down the default logs.
// At Reverb we use a custom structured logging format so we'll take the logging
// from here.
consumer.SetLogger(
&NoopNSQLogger{},
nsq.LogLevelError,
)
// Injects our handler into the consumer. You'll define one handler
// per consumer, but you can have as many concurrently running handlers
// as specified by the second argument. If your MaxInFlight is less
// than your number of concurrent handlers you'll starve your workers
// as there will never be enough in flight messages for your worker pool
consumer.AddConcurrentHandlers(
&MessageHandler{},
20,
)
// Our consumer will discover where topics are located by our three
// nsqlookupd instances The application will periodically poll
// these nqslookupd instances to discover new nodes or drop unhealthy
// producers.
nsqlds := []string{"nsqlookupd1.local", "nsqlookupd2.local", "nsqlookupd3.local"}
if err := consumer.ConnectToNSQLookupds(nsqlds); err != nil {
log.Fatal(err)
}
// Let's allow our queues to drain properly during shutdown.
// We'll create a channel to listen for SIGINT (Ctrl+C) to signal
// to our application to gracefully shutdown.
shutdown := make(chan os.Signal, 2)
signal.Notify(shutdown, syscall.SIGINT)
// This is our main loop. It will continue to read off of our nsq
// channel until either the consumer dies or our application is signaled
// to stop.
for {
select {
case <-consumer.StopChan:
return // uh oh consumer disconnected. Time to quit.
case <-shutdown:
// Synchronously drain the queue before falling out of main
consumer.Stop()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment