Skip to content

Instantly share code, notes, and snippets.

@banks
Created December 11, 2018 16:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save banks/f8429dfbf6b3c8c0145afeb9be16caa4 to your computer and use it in GitHub Desktop.
Save banks/f8429dfbf6b3c8c0145afeb9be16caa4 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"flag"
"fmt"
"net"
"os"
"os/signal"
"time"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"golang.org/x/time/rate"
)
func main() {
var nBlockers int
var nInstances int
var instanceChurn float64
var serverAddr = &net.TCPAddr{net.ParseIP("127.0.0.1"), 8300, ""}
flag.IntVar(&nBlockers, "c", 10, "number of concurrent blocking query clients to run")
flag.IntVar(&nInstances, "n", 1000, "number of service instances to register")
flag.Float64Var(&instanceChurn, "r", 1.0, "instance churn rate in changes/s")
flag.Parse()
connPool := &pool.ConnPool{
SrcAddr: nil,
LogOutput: os.Stderr,
MaxTime: time.Hour,
MaxStreams: 1000000,
TLSWrapper: nil,
ForceTLS: false,
}
fmt.Printf("==> Registering %d instances of test service...\n", nInstances)
// Register services
for i := 0; i < nInstances; i++ {
args := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("test-%08x", i),
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "test",
Address: "",
Port: 10000 + i,
},
}
var resp struct{}
err := connPool.RPC("dc1", serverAddr, 3,
"Catalog.Register", false, &args, &resp)
if err != nil {
panic(err)
}
}
fmt.Printf("==> Registered %d instances of test service\n", nInstances)
fmt.Printf("==> Starting %d blocking clients on health endpoint\n", nBlockers)
for i := 0; i < nBlockers; i++ {
go runBlocker(i, connPool, serverAddr)
}
fmt.Printf("==> Starting churn process at churn rate %f Hz\n", instanceChurn)
go doChurn(connPool, serverAddr, instanceChurn)
// Run until signal
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
<-ch
}
func doChurn(connPool *pool.ConnPool, serverAddr net.Addr, r float64) {
lim := rate.NewLimiter(rate.Limit(r), 1)
dereg := true
for {
lim.Wait(context.Background())
// Flip one instance
fmt.Printf(" churn dereg: %v\n", dereg)
if dereg {
args := structs.DeregisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("test-%08x", 0),
ServiceID: "test",
}
var resp struct{}
err := connPool.RPC("dc1", serverAddr, 3,
"Catalog.Deregister", false, &args, &resp)
if err != nil {
panic(err)
}
} else {
args := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("test-%08x", 0),
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "test",
Address: "",
Port: 10000,
},
}
var resp struct{}
err := connPool.RPC("dc1", serverAddr, 3,
"Catalog.Register", false, &args, &resp)
if err != nil {
panic(err)
}
}
dereg = !dereg
}
}
func runBlocker(n int, connPool *pool.ConnPool, serverAddr net.Addr) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "test",
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "test-1",
},
QueryOptions: structs.QueryOptions{
MaxQueryTime: 10 * time.Minute,
},
}
for {
var resp *structs.IndexedCheckServiceNodes
err := connPool.RPC("dc1", serverAddr, 3,
"Health.ServiceNodes", false, &args, &resp)
if err != nil {
panic(err)
}
fmt.Printf(" %08d Got blocking response at index %d\n", n, resp.Index)
args.QueryOptions.MinQueryIndex = resp.Index
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment