Created
November 7, 2021 13:09
-
-
Save pluveto/f380362ec04cf4849e5bc9a79c751204 to your computer and use it in GitHub Desktop.
Go NSQ Encapsulation and Usage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mq | |
import ( | |
"encoding/json" | |
"fmt" | |
"time" | |
"github.com/nsqio/go-nsq" | |
"go.uber.org/zap" | |
) | |
type MessageQueueConfig struct { | |
NsqAddr string | |
NsqLookupdAddr string | |
EnableLookupd bool | |
SupportedTopics []string | |
} | |
type MessageQueue struct { | |
subscribers map[string]Subscriber | |
config MessageQueueConfig | |
producer *nsq.Producer | |
} | |
type Messagehandler func(v []byte) bool | |
// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil | |
type LinkedHandlerNode struct { | |
Handler *Messagehandler | |
Index int | |
NextNode *LinkedHandlerNode | |
} | |
type Subscriber struct { | |
HandlerHeadNode *LinkedHandlerNode | |
Consumer *nsq.Consumer | |
Handler nsq.HandlerFunc | |
} | |
func createProducer(addr string) (producer *nsq.Producer, err error) { | |
zap.L().Debug("initProducer to " + addr) | |
config := nsq.NewConfig() | |
producer, err = nsq.NewProducer(addr, config) | |
return | |
} | |
func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) { | |
zap.L().Debug("initConsumer to " + topic + "/" + channel) | |
config := nsq.NewConfig() | |
config.LookupdPollInterval = 15 * time.Second | |
c, err = nsq.NewConsumer(topic, channel, config) | |
return | |
} | |
func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) { | |
zap.L().Debug("New message queue") | |
producer, err := createProducer(config.NsqAddr) | |
if err != nil { | |
return nil, err | |
} | |
subscribers := make(map[string]Subscriber) | |
for _, topic := range config.SupportedTopics { | |
nsq.Register(topic, "default") | |
consumer, err := createConsumer(topic, "default", config.NsqAddr) | |
if err != nil { | |
return nil, err | |
} | |
// 头节点不参与实际使用,所以 Index = -1 | |
headNode := &LinkedHandlerNode{Index: -1} | |
hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error { | |
// 循环链式调用各个 Handler | |
curNode := headNode.NextNode | |
// 当不存在任何用户定义的 Handler 时抛出警告 | |
if(nil == curNode){ | |
return fmt.Errorf("No handler provided!") | |
} | |
for nil != curNode { | |
msg := message.Body | |
zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic) | |
stop := (*curNode.Handler)(msg) | |
if stop { | |
zap.S().Debugf("the message has stopped spreading ") | |
break | |
} | |
curNode = curNode.NextNode | |
} | |
return nil | |
}) | |
consumer.AddHandler(hubHandler) | |
subscribers[topic] = Subscriber{ | |
Consumer: consumer, | |
HandlerHeadNode: headNode, | |
} | |
} | |
return &MessageQueue{ | |
config: config, | |
producer: producer, | |
subscribers: subscribers, | |
}, nil | |
} | |
func (mq *MessageQueue) Run() { | |
for name, s := range mq.subscribers { | |
zap.L().Info("Run consumer for " + name) | |
if mq.config.EnableLookupd { | |
s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr) | |
} else { | |
s.Consumer.ConnectToNSQD(mq.config.NsqAddr) | |
} | |
} | |
} | |
func (mq *MessageQueue) IsTopicSupported(topic string) bool { | |
for _, v := range mq.config.SupportedTopics { | |
if v == topic { | |
return true | |
} | |
} | |
return false | |
} | |
// Pub 向消息队列发送一个消息 | |
func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) { | |
if !mq.IsTopicSupported(topic) { | |
err = fmt.Errorf("unsupported topic name: " + topic) | |
return | |
} | |
body, err := json.Marshal(data) | |
if err != nil { | |
return | |
} | |
zap.L().Info("Pub " + topic + " to mq. data = " + string(body)) | |
return mq.producer.Publish(topic, body) | |
} | |
// Sub 从消息队列订阅一个消息 | |
func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) { | |
if !mq.IsTopicSupported(topic) { | |
err = fmt.Errorf("unsupported topic name: " + topic) | |
return | |
} | |
zap.L().Info("Subscribe " + topic) | |
subscriber, ok := mq.subscribers[topic] | |
if !ok { | |
err = fmt.Errorf("No such topic: " + topic) | |
return | |
} | |
// 抵达最后一个有效链表节点 | |
curNode := subscriber.HandlerHeadNode | |
for nil != curNode.NextNode { | |
curNode = curNode.NextNode | |
} | |
// 创建节点 | |
curNode.NextNode = &LinkedHandlerNode{ | |
Handler: &handler, | |
Index: 1 + curNode.Index, | |
NextNode: nil, | |
} | |
return | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func main(){ | |
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{ | |
NsqAddr: "127.0.0.1:4150", | |
NsqLookupdAddr: "127.0.0.1:4161", | |
SupportedTopics: []string{"hello"}, | |
EnableLookupd: false, | |
}) | |
if err != nil { | |
zap.L().Fatal("Message queue error: " + err.Error()) | |
} | |
m.Sub("hello", func(resp []byte) bool { | |
zap.L().Info("S1 Got: " + string(resp)) | |
return false | |
}) | |
m.Sub("hello", func(resp []byte) bool { | |
zap.L().Info("S2 Got: " + string(resp)) | |
return true | |
}) | |
m.Sub("hello", func(resp []byte) bool { | |
zap.L().Info("S3 Got: " + string(resp)) | |
return false | |
}) | |
m.Run() | |
err = m.Pub("hello", "world") | |
if err != nil { | |
zap.L().Fatal("Message queue error: " + err.Error()) | |
} | |
err = m.Pub("hello", "tom") | |
if err != nil { | |
zap.L().Fatal("Message queue error: " + err.Error()) | |
} | |
sigChan := make(chan os.Signal, 1) | |
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | |
<-sigChan | |
os.Exit(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/pluveto/f380362ec04cf4849e5bc9a79c751204#file-mq-go-L63 这句多余的。