Skip to content

Instantly share code, notes, and snippets.

@FarhanSajid1
Created February 6, 2022 00:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FarhanSajid1/2f37a172f783daee25c9f7026bc9d1fd to your computer and use it in GitHub Desktop.
Save FarhanSajid1/2f37a172f783daee25c9f7026bc9d1fd to your computer and use it in GitHub Desktop.
Example highlighting the issue where not all keyspace notifications are sent during a test
package server
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"regexp"
"strings"
)
var (
// deleteMsgRegex is used for deleting and expired keys.
// NOTE: we ignore expire keys because those keys are not fully expired
deleteMsgRegex = regexp.MustCompile(`(del|expired)`)
)
func sendUserInfo(ctx context.Context, r *redis.Client) {
key := "__key*__:user*"
subChan := r.PSubscribe(ctx, key)
defer subChan.Close()
receiveChan := subChan.Channel()
for {
select {
case <-ctx.Done():
return
case msg := <-receiveChan:
if msg == nil {
continue
}
// only send messages that have a payload of set
if strings.Contains(msg.Payload, "set") {
fmt.Println("received set operation")
}
if deleteMsgRegex.Match([]byte(msg.Payload)) {
fmt.Println("received expired message")
}
}
}
}
// tests
package server
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/require"
"os"
"sync"
"testing"
"time"
)
func Test_sendUserInfo(t *testing.T) {
dsn, ok := os.LookupEnv("REDIS_TESTING_DSN")
if !ok {
t.Skipf("REDIS_TESTING_DSN is required for this benchmark, skipping for now")
}
// redis setup
opts, err := redis.ParseURL(dsn)
if err != nil {
t.Skipf("could not parse the redis url, err: %v", err)
}
r := require.New(t)
client := redis.NewClient(opts)
defer client.Close()
// flush and setup keyspace events
ctx := context.Background()
client.FlushAll(ctx)
client.ConfigSet(ctx, "notify-keyspace-events", "KEA")
errC := make(chan error)
go func() {
sendUserInfo(ctx, client)
}()
userCount := 1
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < userCount; i++ {
userID := uuid.NewV4().String()
key := fmt.Sprintf("user-%s", userID)
data := fmt.Sprintf("dummy" + key)
resp := client.Set(ctx, key, data, time.Minute*10)
r.NoError(resp.Err())
}
}()
// wait until everything is in redis
wg.Wait()
// confirm the number of keys in the database
keys := client.Keys(ctx, "user*")
r.NoError(keys.Err())
r.Equal(userCount, len(keys.Val()))
timer := time.NewTimer(1 * time.Second)
Loop:
for {
select {
case err := <-errC:
r.NoError(err)
case <-timer.C:
break Loop
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment