Skip to content

Instantly share code, notes, and snippets.

@XUJiahua
Last active April 9, 2017 06:00
Show Gist options
  • Save XUJiahua/a1576b214c3f3f385ccf22379c5f227c to your computer and use it in GitHub Desktop.
Save XUJiahua/a1576b214c3f3f385ccf22379c5f227c to your computer and use it in GitHub Desktop.
redis假连接问题
// push.go
package push
import (
"github.com/mediocregopher/radix.v2/pool"
)
func listenOnQueue() {
p, err := pool.NewCustom("tcp", "address", 5, queue.REDIS_CUSTOM_CONN_FACTORY)
if err != nil {
panic(err)
}
// 监听Redis queue中的消息
for {
// 堵塞读redis
payload, message, err := queue.DequeuePayload(p)
if err != nil && err != queue.RedisTimeoutErr {
log.Error(err.Error())
log.Error("payload: ", message)
continue
}else if err != nil && err == queue.RedisTimeoutErr {
log.Debug(err.Error())
continue
}else {
log.Info("payload: ", message)
}
}
}
// queue.go
package queue
import (
"github.com/mediocregopher/radix.v2/pool"
"encoding/json"
"github.com/mediocregopher/radix.v2/redis"
"errors"
)
const TIMEOUT = 10
var RedisTimeoutErr = errors.New("BRPOP timeout")
var REDIS_CUSTOM_CONN_FACTORY pool.DialFunc
func init() {
REDIS_CUSTOM_CONN_FACTORY = func(network, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
return nil, err
}
if err = client.Cmd("AUTH", "password").Err; err != nil {
client.Close()
return nil, err
}
if err = client.Cmd("SELECT", 5).Err; err != nil {
client.Close()
return nil, err
}
return client, nil
}
}
// Dequeue 从队列取一条消息
func Dequeue(p *pool.Pool) (message string, err error) {
// 获取连接未堵塞
log.Debug("[dead conn] wait to get a client")
client, err := p.Get()
if err != nil {
return
}
log.Debug("[dead conn] got a client")
defer p.Put(client)
log.Debug("[dead conn] wait to get a message")
// 如果队列是空的,阻塞读
// http://redis.io/commands/brpop
resp := client.Cmd("BRPOP", QUEUE_NAME, TIMEOUT)
// 实际观测,Cmd方法堵塞了,即使指定了TIMEOUT,TIMEOUT是redis指令层面的参数,并不是针对连接本身
log.Debug("[dead conn] got a message")
if resp.IsType(redis.Nil){
err = RedisTimeoutErr
return
}
arr, err := resp.Array()
if err != nil {
return
}
return arr[1].Str()
}
func DequeuePayload(p *pool.Pool) (payload Payload, message string, err error) {
message, err = Dequeue(p)
if err != nil {
return
}
err = json.Unmarshal([]byte(message), &payload)
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment