Skip to content

Instantly share code, notes, and snippets.

@artyom
Last active June 2, 2020 15:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save artyom/5ec1cf2ad45b2b31d869df1ef0f7aee6 to your computer and use it in GitHub Desktop.
Save artyom/5ec1cf2ad45b2b31d869df1ef0f7aee6 to your computer and use it in GitHub Desktop.
module radix-bug
go 1.14
require (
github.com/mediocregopher/radix/v3 v3.5.1
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
)
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/mediocregopher/radix/v3 v3.5.1 h1:IOYgQUMA380N4khaL5eNT4v/P2LnHa8b0wnVdwZMFsY=
github.com/mediocregopher/radix/v3 v3.5.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"runtime/debug"
"sync/atomic"
"time"
"github.com/mediocregopher/radix/v3"
"golang.org/x/sync/errgroup"
)
func main() {
debug.SetTraceback("all")
flag.Parse()
if err := run(flag.Arg(0)); err != nil {
os.Stderr.WriteString(err.Error() + "\n")
os.Exit(1)
}
}
func run(addr string) error {
if addr == "" {
return fmt.Errorf("usage: %s host:port", filepath.Base(os.Args[0]))
}
const name = "some-channel"
var drift int64
group, ctx := errgroup.WithContext(context.Background())
group.Go(func() error {
pool, err := radix.NewPool("tcp", addr, 4)
if err != nil {
return err
}
defer pool.Close()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := pool.Do(radix.Cmd(nil, "PUBLISH", name, "payload")); err != nil {
return err
}
atomic.AddInt64(&drift, 1)
case <-ctx.Done():
return ctx.Err()
}
}
})
group.Go(func() error {
rc, err := radix.Dial("tcp", addr,
radix.DialReadTimeout(time.Second/2),
radix.DialWriteTimeout(time.Second/2),
radix.DialConnectTimeout(time.Second/2))
if err != nil {
return err
}
defer rc.Close()
conn := radix.PubSub(rc)
defer conn.Close()
ch := make(chan radix.PubSubMessage)
if err := conn.Subscribe(ch, name); err != nil {
return err
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := conn.Ping(); err != nil {
return err
}
case _, ok := <-ch:
atomic.AddInt64(&drift, -1)
if !ok {
return errors.New("zero value receive")
}
}
}
})
group.Go(func() error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
d := atomic.LoadInt64(&drift)
log.Println("send vs receive drift", d)
if d > 1000 {
panic("drift is too big, receiver likely stuck")
}
}
}
})
return group.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment