Skip to content

Instantly share code, notes, and snippets.

@Allenxuxu
Created November 21, 2019 13:23
Show Gist options
  • Save Allenxuxu/af9a1b000c49e9f0ccca985504819259 to your computer and use it in GitHub Desktop.
Save Allenxuxu/af9a1b000c49e9f0ccca985504819259 to your computer and use it in GitHub Desktop.
Redis MQ(消费者/生产者模式) 单 list 版本
package redismq
import (
"encoding/json"
"time"
"github.com/go-redis/redis"
)
// RedisMQ Redis list 封装的消费者/生产者模式 先进先出 MQ
type RedisMQ struct {
client *redis.Client
mqTopic string
}
type protocol struct {
Topic string `json:"topic"`
Content string `json:"content"`
}
// New RedisMQ
func New(mqTopic, host, password string) (*RedisMQ, error) {
client := redis.NewClient(&redis.Options{
Network: "tcp",
Addr: host,
Password: password,
IdleTimeout: 240 * time.Second,
MinIdleConns: 16,
DB: 0, // use default DB
})
_, err := client.Ping().Result()
if err != nil {
return nil, err
}
return &RedisMQ{
client: client,
mqTopic: mqTopic,
}, nil
}
// Push 发送到 指定 topic
func (r *RedisMQ) Push(topic string, value string) (int64, error) {
data, err := json.Marshal(&protocol{
Topic: topic,
Content: value,
})
if err != nil {
return 0, err
}
return r.client.LPush(r.mqTopic, data).Result()
}
// BlockSub 阻塞等待消息
func (r *RedisMQ) BlockSub() (topic, content string, err error) {
return r.BlockSubUntil(0)
}
// BlockSubUntil ...
func (r *RedisMQ) BlockSubUntil(t time.Duration) (topic, content string, err error) {
ret, err := r.client.BRPop(t, r.mqTopic).Result()
if err != nil {
return "", "", err
}
if len(ret) != 2 {
return "", "", nil
}
var msg protocol
if err := json.Unmarshal([]byte(ret[1]), &msg); err != nil {
return "", "", err
}
return msg.Topic, msg.Content, nil
}
func (r *RedisMQ) clear(topic string) error {
return r.client.LTrim(topic, 0, -1).Err()
}
package redismq
import (
"log"
"strconv"
"sync"
"testing"
"time"
)
const topic = "redisMQ/test1"
func TestNew(t *testing.T) {
_, err := New(topic, "172.25.20.95:6379", "")
if err != nil {
t.Fatal(err)
}
}
func TestRedisMQ(t *testing.T) {
mq, err := New(topic, "172.25.20.95:6379", "")
if err != nil {
t.Fatal(err)
}
if err := mq.clear(topic); err != nil {
t.Fatal(err)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
var i int
for {
i++
topic, content, err := mq.BlockSub()
if err != nil {
t.Fatal(err)
}
log.Println(topic, content)
if i == 4 && content == "stop" && topic == "4" {
break
}
if content != strconv.Itoa(i) {
t.Fatal()
}
}
wg.Done()
}()
time.Sleep(time.Second)
_, err = mq.Push("1", "1")
if err != nil {
t.Fatal(err)
}
_, err = mq.Push("2", "2")
if err != nil {
t.Fatal(err)
}
_, err = mq.Push("3", "3")
if err != nil {
t.Fatal(err)
}
_, err = mq.Push("4", "stop")
if err != nil {
t.Fatal(err)
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment