Created
November 21, 2019 13:23
-
-
Save Allenxuxu/af9a1b000c49e9f0ccca985504819259 to your computer and use it in GitHub Desktop.
Redis MQ(消费者/生产者模式) 单 list 版本
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package redismq | |
import ( | |
"encoding/json" | |
"time" | |
"github.com/go-redis/redis" | |
) | |
// RedisMQ Redis list 封装的消费者/生产者模式 先进先出 MQ | |
type RedisMQ struct { | |
client *redis.Client | |
mqTopic string | |
} | |
type protocol struct { | |
Topic string `json:"topic"` | |
Content string `json:"content"` | |
} | |
// New RedisMQ | |
func New(mqTopic, host, password string) (*RedisMQ, error) { | |
client := redis.NewClient(&redis.Options{ | |
Network: "tcp", | |
Addr: host, | |
Password: password, | |
IdleTimeout: 240 * time.Second, | |
MinIdleConns: 16, | |
DB: 0, // use default DB | |
}) | |
_, err := client.Ping().Result() | |
if err != nil { | |
return nil, err | |
} | |
return &RedisMQ{ | |
client: client, | |
mqTopic: mqTopic, | |
}, nil | |
} | |
// Push 发送到 指定 topic | |
func (r *RedisMQ) Push(topic string, value string) (int64, error) { | |
data, err := json.Marshal(&protocol{ | |
Topic: topic, | |
Content: value, | |
}) | |
if err != nil { | |
return 0, err | |
} | |
return r.client.LPush(r.mqTopic, data).Result() | |
} | |
// BlockSub 阻塞等待消息 | |
func (r *RedisMQ) BlockSub() (topic, content string, err error) { | |
return r.BlockSubUntil(0) | |
} | |
// BlockSubUntil ... | |
func (r *RedisMQ) BlockSubUntil(t time.Duration) (topic, content string, err error) { | |
ret, err := r.client.BRPop(t, r.mqTopic).Result() | |
if err != nil { | |
return "", "", err | |
} | |
if len(ret) != 2 { | |
return "", "", nil | |
} | |
var msg protocol | |
if err := json.Unmarshal([]byte(ret[1]), &msg); err != nil { | |
return "", "", err | |
} | |
return msg.Topic, msg.Content, nil | |
} | |
func (r *RedisMQ) clear(topic string) error { | |
return r.client.LTrim(topic, 0, -1).Err() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package redismq | |
import ( | |
"log" | |
"strconv" | |
"sync" | |
"testing" | |
"time" | |
) | |
const topic = "redisMQ/test1" | |
func TestNew(t *testing.T) { | |
_, err := New(topic, "172.25.20.95:6379", "") | |
if err != nil { | |
t.Fatal(err) | |
} | |
} | |
func TestRedisMQ(t *testing.T) { | |
mq, err := New(topic, "172.25.20.95:6379", "") | |
if err != nil { | |
t.Fatal(err) | |
} | |
if err := mq.clear(topic); err != nil { | |
t.Fatal(err) | |
} | |
wg := sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
var i int | |
for { | |
i++ | |
topic, content, err := mq.BlockSub() | |
if err != nil { | |
t.Fatal(err) | |
} | |
log.Println(topic, content) | |
if i == 4 && content == "stop" && topic == "4" { | |
break | |
} | |
if content != strconv.Itoa(i) { | |
t.Fatal() | |
} | |
} | |
wg.Done() | |
}() | |
time.Sleep(time.Second) | |
_, err = mq.Push("1", "1") | |
if err != nil { | |
t.Fatal(err) | |
} | |
_, err = mq.Push("2", "2") | |
if err != nil { | |
t.Fatal(err) | |
} | |
_, err = mq.Push("3", "3") | |
if err != nil { | |
t.Fatal(err) | |
} | |
_, err = mq.Push("4", "stop") | |
if err != nil { | |
t.Fatal(err) | |
} | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment