Skip to content

Instantly share code, notes, and snippets.

@JensRantil JensRantil/go.mod
Last active Apr 25, 2019

Embed
What would you like to do?
A tiny application that connects to NATS and measure latencies to NATS.
module github.com/tink-ab/tink-backend/src/nats-checker
require (
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.7.2
github.com/nats-io/nkeys v0.0.2 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1
github.com/prometheus/client_golang v0.9.2
golang.org/x/sys v0.0.0-20190425045458-9f0b1ff7b46a // indirect
)
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44=
github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ=
github.com/nats-io/go-nats v1.7.2 h1:cJujlwCYR8iMz5ofZSD/p2WLW8FabhkQ2lIEVbSvNSA=
github.com/nats-io/go-nats v1.7.2/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M=
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190425045458-9f0b1ff7b46a h1:8cVTj31lbQ2I9z63Y1LjjHVKGisLuXDt12kKR0+r89w=
golang.org/x/sys v0.0.0-20190425045458-9f0b1ff7b46a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"sync"
"time"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
nats "github.com/nats-io/go-nats"
)
var (
connect = flag.String("connect", nats.DefaultURL, fmt.Sprintf("NATS URL. (default: %s)", nats.DefaultURL))
clientCert = flag.String("client-cert", "", "Client certificate to connect to NATS.")
clientCertKey = flag.String("client-cert-key", "", "Client certificate key to connect to NATS.")
testInterval = flag.Duration("test-interval", 500*time.Millisecond, "How often the test should run.")
testTimeout = flag.Duration("test-timeout", 5*time.Second, "Maximum time to wait for a message in NATS.")
listen = flag.String("listen", ":8080", "The interface to listen on.")
debug = flag.Bool("debug", false, "Set to true for verbose logging.")
)
const PROM_NS = "natscheck"
var (
endToEndLatency prometheus.Histogram
errors = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: PROM_NS,
Name: "test_error",
Help: "Counters for different types of errors.",
},
[]string{"type", "client"},
)
timeouts = promauto.NewCounter(
prometheus.CounterOpts{
Namespace: PROM_NS,
Name: "test_timeout",
Help: "Number of times a ping timed out.",
},
)
connectLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: PROM_NS,
Name: "nats_connect_latency",
Help: "A histogram showing latencies.",
})
)
func init() {
flag.Parse()
buckets := generateBuckets(*testTimeout)
endToEndLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: PROM_NS,
Name: "test_latency",
Help: "A histogram showing test latencies.",
Buckets: buckets,
})
}
var smallestTestLatencyBucket = durationToSeconds(500 * time.Microsecond)
func generateBuckets(maxBucket time.Duration) []float64 {
maxBucketSeconds := durationToSeconds(maxBucket)
var buckets []float64
i := 1
for len(buckets) == 0 || buckets[len(buckets)-1] < maxBucketSeconds {
buckets = prometheus.ExponentialBuckets(smallestTestLatencyBucket, 2, i)
i++
}
buckets[len(buckets)-1] = maxBucketSeconds
return buckets
}
func main() {
if certpath := *clientCert; certpath != "" && !isReadable(certpath) {
log.Fatalln("Could not open client cert:", certpath)
}
if keypath := *clientCertKey; keypath != "" && !isReadable(keypath) {
log.Fatalln("Could not open client key:", keypath)
}
go startWebServer()
log.Println("Started.")
t := time.Tick(*testInterval)
for _ = range t {
go RunTest()
}
}
func startWebServer() {
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(*listen, nil))
}
func isReadable(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
}
f.Close()
return true
}
func RunTest() {
var readyToSend sync.WaitGroup
subj := generateUniqueSubject()
debugLog("Generated subject for test:", subj)
readyToSend.Add(1)
go Receive(&readyToSend, subj)
Send(&readyToSend, subj)
}
func debugLog(s string, a ...string) {
if *debug {
var args []interface{}
args = append(args, interface{}(s))
for _, e := range a {
// Can this be done in a simpler way?
args = append(args, interface{}(e))
}
log.Println(args...)
}
}
var ulidReader = ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0)
func generateUniqueSubject() string {
return fmt.Sprintf("natscheck.%s", ulid.MustNew(ulid.Timestamp(time.Now()), ulidReader).String())
}
func Send(readyToSend *sync.WaitGroup, subj string) {
nc, err := connectToNats()
if err != nil {
log.Println("Unable to connect:", err)
incrementError(SenderClient, "could not connect")
// Important for the client to not wait forever.
readyToSend.Done()
return
}
defer nc.Close()
readyToSend.Wait()
message, _ := time.Now().GobEncode()
nc.Publish(subj, message)
}
func connectToNats() (*nats.Conn, error) {
connectStart := time.Now()
defer registerLatency(connectLatency, time.Now().Sub(connectStart))
var options []nats.Option
if *clientCert != "" && *clientCertKey != "" {
options = append(options, nats.ClientCert(*clientCert, *clientCertKey))
}
nc, err := nats.Connect(*connect, options...)
if !nc.IsConnected() {
// Assert we understand the NATS API properly.
log.Fatalln("Unable to connect.")
}
debugLog("Connected.")
return nc, err
}
func Receive(started *sync.WaitGroup, subj string) {
nc, err := connectToNats()
if err != nil {
log.Println("Unable to connect:", err)
incrementError(ReceiverClient, "could not connect")
// Important for the client to not wait forever.
started.Done()
return
}
defer nc.Close()
sub, err := nc.SubscribeSync(subj)
if err != nil {
log.Println("Unable to create subscription:", err)
incrementError(ReceiverClient, "could not create subscription")
// Important for the client to not wait forever.
started.Done()
return
}
sub.AutoUnsubscribe(1)
// Flushing to make sure that the subscription is synchronized with the server.
if err := nc.Flush(); err != nil {
log.Println("Unable to flush connection:", err)
incrementError(ReceiverClient, "could not flush connection")
// Important for the client to not wait forever.
started.Done()
return
}
m, err := sub.NextMsg(*testTimeout)
if err != nil {
if err == nats.ErrTimeout {
timeouts.Inc()
} else {
log.Println("Unable to receive message:", err)
}
incrementError(ReceiverClient, "could not receive message")
return
}
var sent time.Time
_ = sent.GobDecode(m.Data)
// Signal to the sender it can now send the message.
started.Done()
latency := time.Now().Sub(sent)
registerLatency(endToEndLatency, latency)
}
var secondAsFloat = float64(time.Second)
func durationToSeconds(d time.Duration) float64 {
return float64(d) / secondAsFloat
}
func registerLatency(h prometheus.Histogram, d time.Duration) {
h.Observe(durationToSeconds(d))
}
type ClientType int
const (
ReceiverClient ClientType = iota
SenderClient = iota
)
func incrementError(c ClientType, errDesc string) {
errors.With(prometheus.Labels{"type": errDesc, "client": clientTypeString(c)})
}
func clientTypeString(c ClientType) string {
switch c {
case ReceiverClient:
return "receiver"
case SenderClient:
return "sender"
default:
panic(fmt.Sprintf("unrecognized client: %+v", c))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.