Last active
April 25, 2019 09:35
-
-
Save JensRantil/56cc0d36d71d94aea57789bc2260013c to your computer and use it in GitHub Desktop.
A tiny application that connects to NATS and measure latencies to NATS.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module github.com/tink-ab/tink-backend/src/nats-checker | |
require ( | |
github.com/nats-io/gnatsd v1.4.1 // indirect | |
github.com/nats-io/go-nats v1.7.2 | |
github.com/nats-io/nkeys v0.0.2 // indirect | |
github.com/nats-io/nuid v1.0.1 // indirect | |
github.com/oklog/ulid v1.3.1 | |
github.com/prometheus/client_golang v0.9.2 | |
golang.org/x/sys v0.0.0-20190425045458-9f0b1ff7b46a // indirect | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= | |
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= | |
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= | |
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= | |
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= | |
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= | |
github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44= | |
github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= | |
github.com/nats-io/go-nats v1.7.2 h1:cJujlwCYR8iMz5ofZSD/p2WLW8FabhkQ2lIEVbSvNSA= | |
github.com/nats-io/go-nats v1.7.2/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= | |
github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M= | |
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= | |
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= | |
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= | |
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= | |
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= | |
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= | |
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= | |
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= | |
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= | |
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= | |
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= | |
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= | |
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= | |
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= | |
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= | |
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= | |
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ= | |
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |
golang.org/x/sys v0.0.0-20190425045458-9f0b1ff7b46a h1:8cVTj31lbQ2I9z63Y1LjjHVKGisLuXDt12kKR0+r89w= | |
golang.org/x/sys v0.0.0-20190425045458-9f0b1ff7b46a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"log" | |
"math/rand" | |
"net/http" | |
"os" | |
"sync" | |
"time" | |
"github.com/oklog/ulid" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/client_golang/prometheus/promauto" | |
"github.com/prometheus/client_golang/prometheus/promhttp" | |
nats "github.com/nats-io/go-nats" | |
) | |
var ( | |
connect = flag.String("connect", nats.DefaultURL, fmt.Sprintf("NATS URL. (default: %s)", nats.DefaultURL)) | |
clientCert = flag.String("client-cert", "", "Client certificate to connect to NATS.") | |
clientCertKey = flag.String("client-cert-key", "", "Client certificate key to connect to NATS.") | |
testInterval = flag.Duration("test-interval", 500*time.Millisecond, "How often the test should run.") | |
testTimeout = flag.Duration("test-timeout", 5*time.Second, "Maximum time to wait for a message in NATS.") | |
listen = flag.String("listen", ":8080", "The interface to listen on.") | |
debug = flag.Bool("debug", false, "Set to true for verbose logging.") | |
) | |
const PROM_NS = "natscheck" | |
var ( | |
endToEndLatency prometheus.Histogram | |
errors = promauto.NewCounterVec( | |
prometheus.CounterOpts{ | |
Namespace: PROM_NS, | |
Name: "test_error", | |
Help: "Counters for different types of errors.", | |
}, | |
[]string{"type", "client"}, | |
) | |
timeouts = promauto.NewCounter( | |
prometheus.CounterOpts{ | |
Namespace: PROM_NS, | |
Name: "test_timeout", | |
Help: "Number of times a ping timed out.", | |
}, | |
) | |
connectLatency = promauto.NewHistogram(prometheus.HistogramOpts{ | |
Namespace: PROM_NS, | |
Name: "nats_connect_latency", | |
Help: "A histogram showing latencies.", | |
}) | |
) | |
func init() { | |
flag.Parse() | |
buckets := generateBuckets(*testTimeout) | |
endToEndLatency = promauto.NewHistogram(prometheus.HistogramOpts{ | |
Namespace: PROM_NS, | |
Name: "test_latency", | |
Help: "A histogram showing test latencies.", | |
Buckets: buckets, | |
}) | |
} | |
var smallestTestLatencyBucket = durationToSeconds(500 * time.Microsecond) | |
func generateBuckets(maxBucket time.Duration) []float64 { | |
maxBucketSeconds := durationToSeconds(maxBucket) | |
var buckets []float64 | |
i := 1 | |
for len(buckets) == 0 || buckets[len(buckets)-1] < maxBucketSeconds { | |
buckets = prometheus.ExponentialBuckets(smallestTestLatencyBucket, 2, i) | |
i++ | |
} | |
buckets[len(buckets)-1] = maxBucketSeconds | |
return buckets | |
} | |
func main() { | |
if certpath := *clientCert; certpath != "" && !isReadable(certpath) { | |
log.Fatalln("Could not open client cert:", certpath) | |
} | |
if keypath := *clientCertKey; keypath != "" && !isReadable(keypath) { | |
log.Fatalln("Could not open client key:", keypath) | |
} | |
go startWebServer() | |
log.Println("Started.") | |
t := time.Tick(*testInterval) | |
for _ = range t { | |
go RunTest() | |
} | |
} | |
func startWebServer() { | |
http.Handle("/metrics", promhttp.Handler()) | |
log.Fatal(http.ListenAndServe(*listen, nil)) | |
} | |
func isReadable(path string) bool { | |
f, err := os.Open(path) | |
if err != nil { | |
return false | |
} | |
f.Close() | |
return true | |
} | |
func RunTest() { | |
var readyToSend sync.WaitGroup | |
subj := generateUniqueSubject() | |
debugLog("Generated subject for test:", subj) | |
readyToSend.Add(1) | |
go Receive(&readyToSend, subj) | |
Send(&readyToSend, subj) | |
} | |
func debugLog(s string, a ...string) { | |
if *debug { | |
var args []interface{} | |
args = append(args, interface{}(s)) | |
for _, e := range a { | |
// Can this be done in a simpler way? | |
args = append(args, interface{}(e)) | |
} | |
log.Println(args...) | |
} | |
} | |
var ulidReader = ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0) | |
func generateUniqueSubject() string { | |
return fmt.Sprintf("natscheck.%s", ulid.MustNew(ulid.Timestamp(time.Now()), ulidReader).String()) | |
} | |
func Send(readyToSend *sync.WaitGroup, subj string) { | |
nc, err := connectToNats() | |
if err != nil { | |
log.Println("Unable to connect:", err) | |
incrementError(SenderClient, "could not connect") | |
// Important for the client to not wait forever. | |
readyToSend.Done() | |
return | |
} | |
defer nc.Close() | |
readyToSend.Wait() | |
message, _ := time.Now().GobEncode() | |
nc.Publish(subj, message) | |
} | |
func connectToNats() (*nats.Conn, error) { | |
connectStart := time.Now() | |
defer registerLatency(connectLatency, time.Now().Sub(connectStart)) | |
var options []nats.Option | |
if *clientCert != "" && *clientCertKey != "" { | |
options = append(options, nats.ClientCert(*clientCert, *clientCertKey)) | |
} | |
nc, err := nats.Connect(*connect, options...) | |
if !nc.IsConnected() { | |
// Assert we understand the NATS API properly. | |
log.Fatalln("Unable to connect.") | |
} | |
debugLog("Connected.") | |
return nc, err | |
} | |
func Receive(started *sync.WaitGroup, subj string) { | |
nc, err := connectToNats() | |
if err != nil { | |
log.Println("Unable to connect:", err) | |
incrementError(ReceiverClient, "could not connect") | |
// Important for the client to not wait forever. | |
started.Done() | |
return | |
} | |
defer nc.Close() | |
sub, err := nc.SubscribeSync(subj) | |
if err != nil { | |
log.Println("Unable to create subscription:", err) | |
incrementError(ReceiverClient, "could not create subscription") | |
// Important for the client to not wait forever. | |
started.Done() | |
return | |
} | |
sub.AutoUnsubscribe(1) | |
// Flushing to make sure that the subscription is synchronized with the server. | |
if err := nc.Flush(); err != nil { | |
log.Println("Unable to flush connection:", err) | |
incrementError(ReceiverClient, "could not flush connection") | |
// Important for the client to not wait forever. | |
started.Done() | |
return | |
} | |
m, err := sub.NextMsg(*testTimeout) | |
if err != nil { | |
if err == nats.ErrTimeout { | |
timeouts.Inc() | |
} else { | |
log.Println("Unable to receive message:", err) | |
} | |
incrementError(ReceiverClient, "could not receive message") | |
return | |
} | |
var sent time.Time | |
_ = sent.GobDecode(m.Data) | |
// Signal to the sender it can now send the message. | |
started.Done() | |
latency := time.Now().Sub(sent) | |
registerLatency(endToEndLatency, latency) | |
} | |
var secondAsFloat = float64(time.Second) | |
func durationToSeconds(d time.Duration) float64 { | |
return float64(d) / secondAsFloat | |
} | |
func registerLatency(h prometheus.Histogram, d time.Duration) { | |
h.Observe(durationToSeconds(d)) | |
} | |
type ClientType int | |
const ( | |
ReceiverClient ClientType = iota | |
SenderClient = iota | |
) | |
func incrementError(c ClientType, errDesc string) { | |
errors.With(prometheus.Labels{"type": errDesc, "client": clientTypeString(c)}) | |
} | |
func clientTypeString(c ClientType) string { | |
switch c { | |
case ReceiverClient: | |
return "receiver" | |
case SenderClient: | |
return "sender" | |
default: | |
panic(fmt.Sprintf("unrecognized client: %+v", c)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment