Skip to content

Instantly share code, notes, and snippets.

@ripienaar
Created March 23, 2023 13:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ripienaar/736b1940c341ad2bc99454779d97d1fb to your computer and use it in GitHub Desktop.
Save ripienaar/736b1940c341ad2bc99454779d97d1fb to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"github.com/choria-io/fisk"
"github.com/nats-io/jsm.go/natscontext"
"github.com/nats-io/nats.go"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
)
var (
subject string
nctx string
interval time.Duration
ctx context.Context
cancel context.CancelFunc
)
func main() {
app := fisk.New("ct", "NATS connection latency test")
app.Flag("context", "NATS Context to connect with").StringVar(&nctx)
publisher := app.Command("publish", "Start a publisher").Alias("pub").Action(runPublisher)
publisher.Arg("subject", "Subject to test on").Required().StringVar(&subject)
publisher.Flag("interval", "Publish interval").Short('i').Default("1s").DurationVar(&interval)
subscriber := app.Commandf("subscribe", "Start a subscriber").Alias("sub").Action(runSubscriber)
subscriber.Arg("subject", "Subject to test on").Required().StringVar(&subject)
ctx, cancel = context.WithCancel(context.Background())
go interruptWatcher()
app.MustParseWithUsage(os.Args[1:])
}
func connectNats() (*nats.Conn, error) {
return natscontext.Connect(nctx)
}
func runPublisher(_ *fisk.ParseContext) error {
nc, err := connectNats()
if err != nil {
return err
}
var (
errs int
corruptions int
symmetryHist []float64
rttHist []time.Duration
platHist []time.Duration
)
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
start := time.Now()
to, cancel := context.WithTimeout(ctx, 2*time.Second)
res, err := nc.RequestWithContext(to, subject, []byte(fmt.Sprintf("%d", start.UnixNano())))
cancel()
if err != nil {
fmt.Printf("Error publishing: %v\n", err)
errs++
continue
}
parts := strings.Split(string(res.Data), " ")
if len(parts) != 2 {
fmt.Printf("Invalid response %q\n", res.Data)
corruptions++
continue
}
ns, err := strconv.Atoi(parts[1])
if err != nil {
fmt.Printf("Invalid response: %q: %v", res.Data, err)
corruptions++
continue
}
since := time.Since(start)
pubRtt := time.Duration(ns)
symmetry := float64(since) / float64(pubRtt)
if len(rttHist) == 51 {
rttHist = rttHist[1:]
}
if len(platHist) == 51 {
platHist = platHist[1:]
}
if len(symmetryHist) == 51 {
symmetryHist = symmetryHist[1:]
}
rttHist = append(rttHist, since)
platHist = append(platHist, pubRtt)
symmetryHist = append(symmetryHist, symmetry)
fmt.Printf("%v roundtrip time: %v (%v) publish latency: %v (%v) rtt symmetry: %0.3f (%0.3f)\n",
start.Format(time.RFC3339),
since.Round(time.Microsecond), avgDuration(rttHist).Round(time.Microsecond),
pubRtt.Round(time.Microsecond), avgDuration(platHist).Round(time.Microsecond),
symmetry, avgFloats(symmetryHist))
case <-ctx.Done():
return nil
}
}
}
func avgFloats(d []float64) float64 {
sum := float64(0)
for _, i := range d {
sum += i
}
return sum / float64(len(d))
}
func avgDuration(d []time.Duration) time.Duration {
sum := time.Duration(0)
for _, i := range d {
sum += i
}
return time.Duration(int(sum) / len(d))
}
func runSubscriber(_ *fisk.ParseContext) error {
nc, err := connectNats()
if err != nil {
return err
}
_, err = nc.Subscribe(subject, func(msg *nats.Msg) {
ns, err := strconv.Atoi(string(msg.Data))
if err != nil {
fmt.Printf("Invalid request: %q: %v", msg.Data, err)
return
}
plat := time.Since(time.Unix(0, int64(ns)))
fmt.Printf("%v Received a message with publish latency: %v\n", time.Now().Format(time.RFC3339), plat.Round(time.Microsecond))
err = msg.Respond([]byte(fmt.Sprintf("%d %d", time.Now().UnixNano(), plat)))
if err != nil {
fmt.Printf("Reply failed: %v", err)
return
}
})
if err != nil {
return err
}
<-ctx.Done()
return nil
}
func interruptWatcher() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case sig := <-sigs:
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
cancel()
}
case <-ctx.Done():
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment