Last active
July 11, 2016 02:29
-
-
Save linnv/1a73890fc699aab4ef091c4781beff63 to your computer and use it in GitHub Desktop.
redis sub/pub demo implement by golang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"log" | |
"time" | |
"gopkg.in/redis.v2" | |
) | |
// var redisAddr string = "192.168.100.61" | |
var redisAddr string = "192.168.9.202" | |
var startSub = make(chan bool, 1) | |
const channel = "pubc1" | |
func main() { | |
done := make(chan struct{}) | |
go redisv4Pub("one", done) | |
go redisv4Sub(done) | |
time.Sleep(time.Second * 1) | |
close(done) | |
println("exiting") | |
// time.Sleep(time.Second * 1) | |
} | |
type Notification struct { | |
Id int `json:"Id"` | |
Time time.Time `json:"Time"` | |
Extra interface{} `json:"Extra"` | |
} | |
func redisv4Pub(index string, done chan struct{}) { | |
//let sub run first | |
println("pub waiting signal of starting sub") | |
<-startSub | |
println("pub receivd signal of starting sub, start pub") | |
redisClient := redis.NewTCPClient(&redis.Options{ | |
Addr: fmt.Sprintf("%s:%d", redisAddr, 6379), | |
Password: "", | |
DB: 0, | |
}) | |
defer redisClient.Close() | |
errcmd := redisClient.Ping() | |
if errcmd.Err() != nil { | |
panic("network error") | |
} | |
count := 0 | |
for { | |
count++ | |
data := &Notification{ | |
Id: count, | |
Time: time.Now(), | |
} | |
bs, err := json.Marshal(data) | |
if err != nil { | |
panic(err.Error()) | |
} | |
cmd := redisClient.Publish(channel, string(bs)) | |
if cmd.Err() != nil { | |
panic("pub error") | |
} | |
// log.Printf("publicing msg: %+v\n", cmd.String()) | |
select { | |
case <-done: | |
println("exit pub") | |
return | |
default: | |
// fmt.Printf("pub select default count: %d\n", count) | |
} | |
// time.Sleep(time.Second) | |
} | |
} | |
func redisv4Sub(done chan struct{}) { | |
println("sub send starting signal") | |
redisClient := redis.NewTCPClient(&redis.Options{ | |
Addr: fmt.Sprintf("%s:%d", redisAddr, 6379), | |
Password: "", | |
DB: 0, | |
}) | |
errcmd := redisClient.Ping() | |
if errcmd.Err() != nil { | |
panic("network error") | |
} | |
pubsub := redisClient.PubSub() | |
defer pubsub.Close() | |
pubsub.Subscribe(channel) | |
startSub <- true | |
println("sub sent starting signal,start sub") | |
// pubsub.Unsubscribe(channel) | |
var msg interface{} | |
var err error | |
data := &Notification{} | |
// count := 0 | |
for { | |
// count++ | |
msg, err = pubsub.ReceiveTimeout(time.Second * 2) | |
// msg, err = pubsub.Receive() | |
if err != nil { | |
log.Printf("read timeout %d of redis sub", 2) | |
} | |
switch v := msg.(type) { | |
case *redis.Message: | |
// fmt.Printf("getting msg [%s: message: %s]\n", v.Channel, v.Payload) | |
// log.Printf("----getting msg [%s: message: %s]\n", v.Channel, v.Payload) | |
err = json.Unmarshal([]byte(v.Payload), data) | |
if err != nil { | |
panic(err.Error()) | |
} | |
log.Printf("sub data: %+v\n", data) | |
fmt.Printf("time elapse: %+v\n", time.Since(data.Time)) | |
case *redis.Subscription: | |
fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count) | |
case error: | |
println("error") | |
return | |
} | |
// time.Sleep(time.Second * 2) | |
select { | |
case <-done: | |
println("exit sub") | |
return | |
default: | |
// fmt.Printf("--sub select default count: %d\n", count) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment