Last active
April 9, 2017 06:00
-
-
Save XUJiahua/a1576b214c3f3f385ccf22379c5f227c to your computer and use it in GitHub Desktop.
redis假连接问题
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// push.go | |
package push | |
import ( | |
"github.com/mediocregopher/radix.v2/pool" | |
) | |
func listenOnQueue() { | |
p, err := pool.NewCustom("tcp", "address", 5, queue.REDIS_CUSTOM_CONN_FACTORY) | |
if err != nil { | |
panic(err) | |
} | |
// 监听Redis queue中的消息 | |
for { | |
// 堵塞读redis | |
payload, message, err := queue.DequeuePayload(p) | |
if err != nil && err != queue.RedisTimeoutErr { | |
log.Error(err.Error()) | |
log.Error("payload: ", message) | |
continue | |
}else if err != nil && err == queue.RedisTimeoutErr { | |
log.Debug(err.Error()) | |
continue | |
}else { | |
log.Info("payload: ", message) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// queue.go | |
package queue | |
import ( | |
"github.com/mediocregopher/radix.v2/pool" | |
"encoding/json" | |
"github.com/mediocregopher/radix.v2/redis" | |
"errors" | |
) | |
const TIMEOUT = 10 | |
var RedisTimeoutErr = errors.New("BRPOP timeout") | |
var REDIS_CUSTOM_CONN_FACTORY pool.DialFunc | |
func init() { | |
REDIS_CUSTOM_CONN_FACTORY = func(network, addr string) (*redis.Client, error) { | |
client, err := redis.Dial(network, addr) | |
if err != nil { | |
return nil, err | |
} | |
if err = client.Cmd("AUTH", "password").Err; err != nil { | |
client.Close() | |
return nil, err | |
} | |
if err = client.Cmd("SELECT", 5).Err; err != nil { | |
client.Close() | |
return nil, err | |
} | |
return client, nil | |
} | |
} | |
// Dequeue 从队列取一条消息 | |
func Dequeue(p *pool.Pool) (message string, err error) { | |
// 获取连接未堵塞 | |
log.Debug("[dead conn] wait to get a client") | |
client, err := p.Get() | |
if err != nil { | |
return | |
} | |
log.Debug("[dead conn] got a client") | |
defer p.Put(client) | |
log.Debug("[dead conn] wait to get a message") | |
// 如果队列是空的,阻塞读 | |
// http://redis.io/commands/brpop | |
resp := client.Cmd("BRPOP", QUEUE_NAME, TIMEOUT) | |
// 实际观测,Cmd方法堵塞了,即使指定了TIMEOUT,TIMEOUT是redis指令层面的参数,并不是针对连接本身 | |
log.Debug("[dead conn] got a message") | |
if resp.IsType(redis.Nil){ | |
err = RedisTimeoutErr | |
return | |
} | |
arr, err := resp.Array() | |
if err != nil { | |
return | |
} | |
return arr[1].Str() | |
} | |
func DequeuePayload(p *pool.Pool) (payload Payload, message string, err error) { | |
message, err = Dequeue(p) | |
if err != nil { | |
return | |
} | |
err = json.Unmarshal([]byte(message), &payload) | |
return | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment