Skip to content

Instantly share code, notes, and snippets.

@pluveto
Created November 7, 2021 13:09
Show Gist options
  • Save pluveto/f380362ec04cf4849e5bc9a79c751204 to your computer and use it in GitHub Desktop.
Save pluveto/f380362ec04cf4849e5bc9a79c751204 to your computer and use it in GitHub Desktop.
Go NSQ Encapsulation and Usage
package mq
import (
"encoding/json"
"fmt"
"time"
"github.com/nsqio/go-nsq"
"go.uber.org/zap"
)
type MessageQueueConfig struct {
NsqAddr string
NsqLookupdAddr string
EnableLookupd bool
SupportedTopics []string
}
type MessageQueue struct {
subscribers map[string]Subscriber
config MessageQueueConfig
producer *nsq.Producer
}
type Messagehandler func(v []byte) bool
// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil
type LinkedHandlerNode struct {
Handler *Messagehandler
Index int
NextNode *LinkedHandlerNode
}
type Subscriber struct {
HandlerHeadNode *LinkedHandlerNode
Consumer *nsq.Consumer
Handler nsq.HandlerFunc
}
func createProducer(addr string) (producer *nsq.Producer, err error) {
zap.L().Debug("initProducer to " + addr)
config := nsq.NewConfig()
producer, err = nsq.NewProducer(addr, config)
return
}
func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
zap.L().Debug("initConsumer to " + topic + "/" + channel)
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err = nsq.NewConsumer(topic, channel, config)
return
}
func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
zap.L().Debug("New message queue")
producer, err := createProducer(config.NsqAddr)
if err != nil {
return nil, err
}
subscribers := make(map[string]Subscriber)
for _, topic := range config.SupportedTopics {
nsq.Register(topic, "default")
consumer, err := createConsumer(topic, "default", config.NsqAddr)
if err != nil {
return nil, err
}
// 头节点不参与实际使用,所以 Index = -1
headNode := &LinkedHandlerNode{Index: -1}
hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error {
// 循环链式调用各个 Handler
curNode := headNode.NextNode
// 当不存在任何用户定义的 Handler 时抛出警告
if(nil == curNode){
return fmt.Errorf("No handler provided!")
}
for nil != curNode {
msg := message.Body
zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic)
stop := (*curNode.Handler)(msg)
if stop {
zap.S().Debugf("the message has stopped spreading ")
break
}
curNode = curNode.NextNode
}
return nil
})
consumer.AddHandler(hubHandler)
subscribers[topic] = Subscriber{
Consumer: consumer,
HandlerHeadNode: headNode,
}
}
return &MessageQueue{
config: config,
producer: producer,
subscribers: subscribers,
}, nil
}
func (mq *MessageQueue) Run() {
for name, s := range mq.subscribers {
zap.L().Info("Run consumer for " + name)
if mq.config.EnableLookupd {
s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
} else {
s.Consumer.ConnectToNSQD(mq.config.NsqAddr)
}
}
}
func (mq *MessageQueue) IsTopicSupported(topic string) bool {
for _, v := range mq.config.SupportedTopics {
if v == topic {
return true
}
}
return false
}
// Pub 向消息队列发送一个消息
func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) {
if !mq.IsTopicSupported(topic) {
err = fmt.Errorf("unsupported topic name: " + topic)
return
}
body, err := json.Marshal(data)
if err != nil {
return
}
zap.L().Info("Pub " + topic + " to mq. data = " + string(body))
return mq.producer.Publish(topic, body)
}
// Sub 从消息队列订阅一个消息
func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) {
if !mq.IsTopicSupported(topic) {
err = fmt.Errorf("unsupported topic name: " + topic)
return
}
zap.L().Info("Subscribe " + topic)
subscriber, ok := mq.subscribers[topic]
if !ok {
err = fmt.Errorf("No such topic: " + topic)
return
}
// 抵达最后一个有效链表节点
curNode := subscriber.HandlerHeadNode
for nil != curNode.NextNode {
curNode = curNode.NextNode
}
// 创建节点
curNode.NextNode = &LinkedHandlerNode{
Handler: &handler,
Index: 1 + curNode.Index,
NextNode: nil,
}
return
}
func main(){
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
NsqAddr: "127.0.0.1:4150",
NsqLookupdAddr: "127.0.0.1:4161",
SupportedTopics: []string{"hello"},
EnableLookupd: false,
})
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S1 Got: " + string(resp))
return false
})
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S2 Got: " + string(resp))
return true
})
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S3 Got: " + string(resp))
return false
})
m.Run()
err = m.Pub("hello", "world")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
err = m.Pub("hello", "tom")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
os.Exit(0)
}
@sko00o
Copy link

sko00o commented Mar 28, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment