Skip to content

Instantly share code, notes, and snippets.

@PumpkinSeed
Last active July 26, 2017 16:49
Show Gist options
  • Save PumpkinSeed/8271e7b972f485f81df82e5e8c13387a to your computer and use it in GitHub Desktop.
Save PumpkinSeed/8271e7b972f485f81df82e5e8c13387a to your computer and use it in GitHub Desktop.
NSQ input handler for heurelog
package input
import (
"sync"
"fmt"
"github.com/PumpkinSeed/heurelog/config"
"github.com/PumpkinSeed/heurelog/handlers/output"
"github.com/Sirupsen/logrus"
nsq "github.com/nsqio/go-nsq"
)
type NSQ struct {
C config.Interface
Output output.Interface
}
func NewNSQ(cnf config.Interface, out output.Interface) (*NSQ, error) {
return &NSQ{
C: cnf,
Output: out,
}, nil
}
func (n *NSQ) Handle() {
conf := n.C.GetInput().NSQ
log := logrus.New()
wg := &sync.WaitGroup{}
wg.Add(1)
config := nsq.NewConfig()
q, _ := nsq.NewConsumer(conf.Topic, conf.Channel, config)
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("Got a message: %s", string(message.Body))
n.Output.Write(message.Body)
return nil
}))
err := q.ConnectToNSQD(fmt.Sprintf("%s:%s", conf.Host, conf.Port))
if err != nil {
log.Panic("Could not connect")
}
wg.Wait()
log.Info("NSQ server started to consume")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment