Created
April 26, 2022 02:09
shimhang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"crypto/tls" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"math/rand" | |
"net" | |
"net/http" | |
"os" | |
"os/signal" | |
"path/filepath" | |
"strings" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/containerd/containerd" | |
eventstypes "github.com/containerd/containerd/api/events" | |
"github.com/containerd/containerd/api/services/tasks/v1" | |
"github.com/containerd/containerd/api/services/ttrpc/events/v1" | |
"github.com/containerd/containerd/pkg/ttrpcutil" | |
shimclient "github.com/containerd/containerd/runtime/v2/shim" | |
"github.com/containerd/containerd/runtime/v2/task" | |
"github.com/containerd/ttrpc" | |
"github.com/containerd/typeurl" | |
ants "github.com/panjf2000/ants/v2" | |
) | |
var ( | |
defaultP *ants.Pool | |
capa = 8 | |
) | |
func init() { | |
pool, err := ants.NewPool(capa) | |
if err != nil { | |
panic(err) | |
} | |
defaultP = pool | |
// return p | |
} | |
var ( | |
cid = "" | |
) | |
func getTaskId(cli *containerd.Client) string { | |
var ( | |
found bool | |
) | |
ctx := context.Background() | |
tt, err := cli.TaskService().List(ctx, &tasks.ListTasksRequest{}) | |
if err != nil { | |
panic(err) | |
} | |
if cid == "" { | |
panic("taskid not define") | |
} | |
for _, c := range tt.Tasks { | |
if strings.HasPrefix(c.ID, cid) { | |
cid = c.ID | |
found = true | |
break | |
} | |
} | |
if !found { | |
panic(fmt.Errorf("not found taskid %s", cid)) | |
} | |
return cid | |
} | |
func mkShimConnPool(addr string, concurrency int) []net.Conn { | |
var ( | |
//connch = make(chan net.Conn, concurrency+1) | |
conns = make([]net.Conn, concurrency) | |
) | |
data, err := ioutil.ReadFile(filepath.Join(addr, "address")) | |
if err != nil { | |
panic(err) | |
} | |
for i := 0; i < concurrency; i++ { | |
conn, err := shimclient.Connect(string(data), shimclient.AnonReconnectDialer) | |
if err != nil { | |
panic(err) | |
} | |
//connch <- conn | |
conns[i] = conn | |
} | |
return conns | |
} | |
// shim 并发执行命令 | |
// 并发数 batch | |
func mkShimBatch(cid string, connch []net.Conn, batch int) { | |
var ( | |
ctx = context.Background() | |
wg sync.WaitGroup | |
_, cancle = context.WithTimeout(ctx, time.Second*3) | |
) | |
defer cancle() | |
for _, v := range connch { | |
wg.Add(1) | |
tmpv := v | |
go func() { | |
defer wg.Done() | |
tcli := ttrpc.NewClient(tmpv) | |
client := task.NewTaskClient(tcli) | |
// err := tmpv.SetReadDeadline(time.Now().Add(time.Microsecond * 300)) | |
// if err != nil { | |
// cancle() | |
// fmt.Println("set read timeout failed:%v", err) | |
// return | |
// } | |
for i := 0; i < batch; i++ { | |
defaultP.Submit(func() { | |
_, err := client.Stats(ctx, &task.StatsRequest{ID: cid}) | |
if err != nil { | |
fmt.Printf("index %d status failed: %s\n", i, err) | |
} else { | |
fmt.Printf("index %d cid: %s \n ", i, cid) | |
} | |
}) | |
} | |
}() | |
} | |
wg.Wait() | |
} | |
//创建contaienrd 连接池 | |
func mkContainerdConnPool(addr string, number int) []*containerd.Client { | |
var ( | |
ss = make([]*containerd.Client, 0, number) | |
) | |
for i := 0; i < number; i++ { | |
ccli, err := containerd.New(addr, containerd.WithDefaultNamespace("k8s.io")) | |
if err != nil { | |
panic(err) | |
} | |
ss = append(ss, ccli) | |
} | |
return ss | |
} | |
// containerd 并发执行命令 | |
func mkContainerdBatch(cc []*containerd.Client, batch int) { | |
var ( | |
ctx = context.Background() | |
wg sync.WaitGroup | |
_, cancle = context.WithTimeout(ctx, time.Second*3) | |
) | |
defer cancle() | |
fmt.Println("start rpc call") | |
wg.Add(len(cc)) | |
for _, v := range cc { | |
ccli := v | |
go func() { | |
defer wg.Done() | |
for i := 0; i < batch; i++ { | |
_, err := ccli.TaskService().Metrics(ctx, &tasks.MetricsRequest{}) | |
if err != nil { | |
fmt.Printf("task Metrics failed: %s\n", err) | |
} else { | |
fmt.Printf("task Metrics ok on index %d\n", i) | |
} | |
} | |
}() | |
} | |
wg.Wait() | |
} | |
func containerdCommand(addr string, con, bat int) { | |
conns := mkContainerdConnPool(addr, con) | |
mkContainerdBatch(conns, bat) | |
} | |
func shimCommand(addr string, concurrency int, batch int, id string) { | |
connch := mkShimConnPool(addr, concurrency) | |
mkShimBatch(id, connch, batch) | |
} | |
func ebk(addr string, concurrency int) []events.EventsService { | |
var ( | |
ss = make([]events.EventsService, 0, concurrency) | |
) | |
for i := 0; i < concurrency; i++ { | |
cli, err := ttrpcutil.NewClient(addr) | |
if err != nil { | |
panic(err) | |
} | |
es, err := cli.EventsService() | |
if err != nil { | |
panic(err) | |
} | |
ss = append(ss, es) | |
} | |
return ss | |
} | |
func mkstr(size int) string { | |
buf := bytes.NewBuffer(make([]byte, size)) | |
for i := 0; i < size; i++ { | |
buf.WriteByte('a') | |
} | |
return buf.String() | |
} | |
func mkRandom(size int) map[string]string { | |
var ( | |
keysize = 10 | |
i = 0 | |
ret = map[string]string{mkstr(keysize): mkstr(keysize)} | |
) | |
for { | |
half := size - (keysize * 2) | |
if half < 0 { | |
return ret | |
} | |
key := fmt.Sprintf("%s%d", mkstr(keysize), i) | |
ret[key] = key | |
i++ | |
size = half | |
} | |
} | |
func ect(cc []events.EventsService, batch int, size int) { | |
var ( | |
ctx = context.Background() | |
wg sync.WaitGroup | |
_, cancle = context.WithTimeout(ctx, time.Second*3) | |
) | |
defer cancle() | |
label := mkRandom(size) | |
// for k, v := range label { | |
// fmt.Printf("k:%s,v:%s\n", k, v) | |
// } | |
any, err := typeurl.MarshalAny(&eventstypes.NamespaceCreate{ | |
Name: "foo", | |
Labels: label, | |
}) | |
if err != nil { | |
panic(err) | |
} | |
req := &events.ForwardRequest{ | |
Envelope: &events.Envelope{ | |
Timestamp: time.Now(), | |
Namespace: "foo", | |
Topic: "/namespaces/create", | |
Event: any, | |
}, | |
} | |
fmt.Println("start rpc call") | |
wg.Add(len(cc)) | |
for _, v := range cc { | |
ccli := v | |
go func() { | |
defer wg.Done() | |
for i := 0; i < batch; i++ { | |
_, err := ccli.Forward(ctx, req) | |
if err != nil { | |
fmt.Printf("event forward failed: %s\n", err) | |
} else { | |
fmt.Printf("event forward ok on index %d\n", i) | |
} | |
} | |
}() | |
} | |
wg.Wait() | |
} | |
func eventCommand(addr string, concurrency int, batch int, size int) { | |
ss := ebk(addr, concurrency) | |
ect(ss, batch, size) | |
} | |
func Stats() { | |
rand.Seed(time.Now().Unix()) | |
var wg sync.WaitGroup | |
var nodename = flag.String("n", "node-1:10250", "hostport to query") | |
var stats = flag.Int("l", 3, "list stats at once") | |
var cert = flag.String("c", "", "cert file") | |
var key = flag.String("k", "", "key file") | |
flag.Parse() | |
tlscert, err := tls.LoadX509KeyPair(*cert, *key) | |
if err != nil { | |
panic(err) | |
} | |
trans := http.Transport{ | |
TLSClientConfig: &tls.Config{ | |
InsecureSkipVerify: true, | |
Certificates: []tls.Certificate{tlscert}, | |
}, | |
DialContext: (&net.Dialer{ | |
Timeout: 30 * time.Second, | |
KeepAlive: 30 * time.Second, | |
}).DialContext, | |
ForceAttemptHTTP2: true, | |
MaxIdleConns: 100, | |
IdleConnTimeout: 90 * time.Second, | |
TLSHandshakeTimeout: 10 * time.Second, | |
ExpectContinueTimeout: 1 * time.Second, | |
} | |
fmt.Printf("run call to stats %d times\n", *stats) | |
for i := 0; i < *stats; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
_, err := get(fmt.Sprintf("https://%s/stats/summary?only_cpu_and_memory=true", *nodename), &trans) | |
if err != nil { | |
log.Printf("didn't find stats for container %v", err) | |
} | |
}() | |
} | |
wg.Wait() | |
} | |
// https://stackoverflow.com/a/45766707/697126 | |
func timing(what string) func() { | |
start := time.Now() | |
return func() { | |
fmt.Printf("%s took %v\n", what, time.Since(start)) | |
} | |
} | |
func get(url string, cfg *http.Transport) (string, error) { | |
defer timing(fmt.Sprintf("timing: %s", url))() | |
http.DefaultTransport = cfg | |
response, err := http.Get(url) | |
if err != nil { | |
return "", err | |
} | |
defer response.Body.Close() | |
contents, err := ioutil.ReadAll(response.Body) | |
if err != nil { | |
return "", err | |
} | |
return string(contents), nil | |
} | |
func Shim() { | |
id := flag.String("id", "", "bundle path") | |
addr := flag.String("a", "", "bundle path") | |
con := flag.Int("c", 2, "concurrency number") | |
bat := flag.Int("b", 3, "batch call in one connect") | |
size := flag.Int("s", 1024, "batch call in one connect") | |
grpc := flag.Bool("g", false, "batch call in one connect") | |
flag.Parse() | |
cid = *id | |
fmt.Println(*con, *bat) | |
info, err := os.Stat(*addr) | |
if err != nil { | |
panic(err) | |
} | |
if info.IsDir() { | |
shimCommand(*addr, *con, *bat, *id) | |
return | |
} else { | |
if *grpc { | |
containerdCommand(*addr, *con, *bat) | |
} else { | |
eventCommand(*addr, *con, *bat, *size) | |
} | |
} | |
} | |
func tcpserver(stopch <-chan struct{}) { | |
ln, err := net.Listen("tcp", "") | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println("listen address: ", ln.Addr().String()) | |
connCh := make(chan net.Conn, 1) | |
go func(l net.Listener, connCh chan net.Conn) { | |
for { | |
conn, err := ln.Accept() | |
if err != nil { | |
fmt.Printf("accept failed: %v\n", err) | |
} | |
connCh <- conn | |
} | |
}(ln, connCh) | |
for { | |
select { | |
case <-stopch: | |
return | |
case conn := <-connCh: | |
fmt.Println("remote addr: ", conn.RemoteAddr()) | |
} | |
} | |
} | |
func tcpclient(addr string, step int, stopch <-chan struct{}) { | |
conn, err := net.Dial("tcp", addr) | |
if err != nil { | |
panic(err) | |
} | |
data := make([]byte, step) | |
for i := 0; i < step; i++ { | |
data[i] = 'a' | |
} | |
errch := make(chan error, 1) | |
go func() { | |
i := 0 | |
for { | |
count, err := conn.Write(data) | |
i++ | |
fmt.Printf("write success,index %d byte length %d\n", i, count) | |
errch <- err | |
} | |
}() | |
for { | |
select { | |
case err := <-errch: | |
if err != nil { | |
fmt.Println("send failed: ", err) | |
} | |
case <-stopch: | |
return | |
} | |
} | |
} | |
func tcprun() { | |
s := flag.Bool("s", false, "run as server, default true") | |
c := flag.Bool("c", false, "run as clent, default false") | |
addr := flag.String("addr", "", "server address") | |
step := flag.Int("step", 1000, "data step, unit: B") | |
flag.Parse() | |
stopch := make(chan struct{}) | |
if s != nil && *s { | |
tcpserver(stopch) | |
} | |
if c != nil && *c { | |
tcpclient(*addr, *step, stopch) | |
} | |
sigch := make(chan os.Signal, 2) | |
signal.Notify(sigch, []os.Signal{os.Interrupt, syscall.SIGTERM}...) | |
<-sigch | |
close(stopch) | |
} | |
func main() { | |
tcprun() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment