Skip to content

Instantly share code, notes, and snippets.

@linnv
Last active July 11, 2016 02:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linnv/1a73890fc699aab4ef091c4781beff63 to your computer and use it in GitHub Desktop.
Save linnv/1a73890fc699aab4ef091c4781beff63 to your computer and use it in GitHub Desktop.
redis sub/pub demo implement by golang
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"gopkg.in/redis.v2"
)
// var redisAddr string = "192.168.100.61"
var redisAddr string = "192.168.9.202"
var startSub = make(chan bool, 1)
const channel = "pubc1"
func main() {
done := make(chan struct{})
go redisv4Pub("one", done)
go redisv4Sub(done)
time.Sleep(time.Second * 1)
close(done)
println("exiting")
// time.Sleep(time.Second * 1)
}
type Notification struct {
Id int `json:"Id"`
Time time.Time `json:"Time"`
Extra interface{} `json:"Extra"`
}
func redisv4Pub(index string, done chan struct{}) {
//let sub run first
println("pub waiting signal of starting sub")
<-startSub
println("pub receivd signal of starting sub, start pub")
redisClient := redis.NewTCPClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", redisAddr, 6379),
Password: "",
DB: 0,
})
defer redisClient.Close()
errcmd := redisClient.Ping()
if errcmd.Err() != nil {
panic("network error")
}
count := 0
for {
count++
data := &Notification{
Id: count,
Time: time.Now(),
}
bs, err := json.Marshal(data)
if err != nil {
panic(err.Error())
}
cmd := redisClient.Publish(channel, string(bs))
if cmd.Err() != nil {
panic("pub error")
}
// log.Printf("publicing msg: %+v\n", cmd.String())
select {
case <-done:
println("exit pub")
return
default:
// fmt.Printf("pub select default count: %d\n", count)
}
// time.Sleep(time.Second)
}
}
func redisv4Sub(done chan struct{}) {
println("sub send starting signal")
redisClient := redis.NewTCPClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", redisAddr, 6379),
Password: "",
DB: 0,
})
errcmd := redisClient.Ping()
if errcmd.Err() != nil {
panic("network error")
}
pubsub := redisClient.PubSub()
defer pubsub.Close()
pubsub.Subscribe(channel)
startSub <- true
println("sub sent starting signal,start sub")
// pubsub.Unsubscribe(channel)
var msg interface{}
var err error
data := &Notification{}
// count := 0
for {
// count++
msg, err = pubsub.ReceiveTimeout(time.Second * 2)
// msg, err = pubsub.Receive()
if err != nil {
log.Printf("read timeout %d of redis sub", 2)
}
switch v := msg.(type) {
case *redis.Message:
// fmt.Printf("getting msg [%s: message: %s]\n", v.Channel, v.Payload)
// log.Printf("----getting msg [%s: message: %s]\n", v.Channel, v.Payload)
err = json.Unmarshal([]byte(v.Payload), data)
if err != nil {
panic(err.Error())
}
log.Printf("sub data: %+v\n", data)
fmt.Printf("time elapse: %+v\n", time.Since(data.Time))
case *redis.Subscription:
fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
println("error")
return
}
// time.Sleep(time.Second * 2)
select {
case <-done:
println("exit sub")
return
default:
// fmt.Printf("--sub select default count: %d\n", count)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment