Skip to content

Instantly share code, notes, and snippets.

@rafaeljusto
Created February 27, 2016 20:19
Show Gist options
  • Save rafaeljusto/55839bfc2325c1d05596 to your computer and use it in GitHub Desktop.
Save rafaeljusto/55839bfc2325c1d05596 to your computer and use it in GitHub Desktop.
redigomock - sending multiple messages to a subscription
package main
import (
"fmt"
"os"
"github.com/garyburd/redigo/redis"
)
const (
redisAddress = "127.0.0.1:6379"
redisChannel = "example"
)
func readMessages(c redis.Conn) <-chan string {
messages := make(chan string)
go func() {
psc := redis.PubSubConn{c}
psc.Subscribe(redisChannel)
for {
switch v := psc.Receive().(type) {
case redis.Message:
messages <- string(v.Data)
case error:
messages <- "error:" + v.Error()
close(messages)
return
}
}
}()
return messages
}
func sendMessages(c redis.Conn) {
c.Send("publish", redisChannel, "value1")
c.Send("publish", redisChannel, "value2")
c.Send("publish", redisChannel, "value3")
c.Send("publish", redisChannel, "finish")
c.Flush()
}
func main() {
// connection for reading the messages
c1, err := redis.Dial("tcp", redisAddress)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer c1.Close()
messages := readMessages(c1)
// connection for sending the messages
c2, err := redis.Dial("tcp", redisAddress)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer c2.Close()
sendMessages(c2)
for msg := range messages {
if msg == "finish" {
break
}
fmt.Println(msg)
}
}
package main
import (
"reflect"
"testing"
"github.com/rafaeljusto/redigomock"
)
func TestReadMessages(t *testing.T) {
c := redigomock.NewConn()
scenarios := []struct {
description string
stub func()
expected []string
}{
{
description: "it should read multiple messages from a subscription",
stub: func() {
c.Command("SUBSCRIBE", redisChannel).Expect([]interface{}{
[]byte("subscribe"),
[]byte(redisChannel),
[]byte("1"),
})
c.AddSubscriptionMessage([]interface{}{
[]byte("message"),
[]byte(redisChannel),
[]byte("value1"),
})
c.AddSubscriptionMessage([]interface{}{
[]byte("message"),
[]byte(redisChannel),
[]byte("value2"),
})
c.AddSubscriptionMessage([]interface{}{
[]byte("message"),
[]byte(redisChannel),
[]byte("value3"),
})
c.AddSubscriptionMessage([]interface{}{
[]byte("message"),
[]byte(redisChannel),
[]byte("finish"),
})
c.Subscription = true // Why?
},
expected: []string{"value1", "value2", "value3"},
},
}
for i, scenario := range scenarios {
c.Clear()
scenario.stub()
messagesCh := readMessages(c)
var messages []string
for msg := range messagesCh {
if msg == "finish" {
break
}
messages = append(messages, msg)
}
if !reflect.DeepEqual(scenario.expected, messages) {
t.Errorf("[%d] %s: unexpected messages. Expected “%#v” and got “%#v”",
i, scenario.description, scenario.expected, messages)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment