Skip to content

Instantly share code, notes, and snippets.

@yylt
Created April 26, 2022 02:09
Show Gist options
  • Save yylt/0d3f2d554fa7eddd9cafe406ef0c9d75 to your computer and use it in GitHub Desktop.
Save yylt/0d3f2d554fa7eddd9cafe406ef0c9d75 to your computer and use it in GitHub Desktop.
shimhang
package main
import (
"bytes"
"context"
"crypto/tls"
"flag"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/containerd"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/pkg/ttrpcutil"
shimclient "github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
ants "github.com/panjf2000/ants/v2"
)
var (
defaultP *ants.Pool
capa = 8
)
func init() {
pool, err := ants.NewPool(capa)
if err != nil {
panic(err)
}
defaultP = pool
// return p
}
var (
cid = ""
)
func getTaskId(cli *containerd.Client) string {
var (
found bool
)
ctx := context.Background()
tt, err := cli.TaskService().List(ctx, &tasks.ListTasksRequest{})
if err != nil {
panic(err)
}
if cid == "" {
panic("taskid not define")
}
for _, c := range tt.Tasks {
if strings.HasPrefix(c.ID, cid) {
cid = c.ID
found = true
break
}
}
if !found {
panic(fmt.Errorf("not found taskid %s", cid))
}
return cid
}
func mkShimConnPool(addr string, concurrency int) []net.Conn {
var (
//connch = make(chan net.Conn, concurrency+1)
conns = make([]net.Conn, concurrency)
)
data, err := ioutil.ReadFile(filepath.Join(addr, "address"))
if err != nil {
panic(err)
}
for i := 0; i < concurrency; i++ {
conn, err := shimclient.Connect(string(data), shimclient.AnonReconnectDialer)
if err != nil {
panic(err)
}
//connch <- conn
conns[i] = conn
}
return conns
}
// shim 并发执行命令
// 并发数 batch
func mkShimBatch(cid string, connch []net.Conn, batch int) {
var (
ctx = context.Background()
wg sync.WaitGroup
_, cancle = context.WithTimeout(ctx, time.Second*3)
)
defer cancle()
for _, v := range connch {
wg.Add(1)
tmpv := v
go func() {
defer wg.Done()
tcli := ttrpc.NewClient(tmpv)
client := task.NewTaskClient(tcli)
// err := tmpv.SetReadDeadline(time.Now().Add(time.Microsecond * 300))
// if err != nil {
// cancle()
// fmt.Println("set read timeout failed:%v", err)
// return
// }
for i := 0; i < batch; i++ {
defaultP.Submit(func() {
_, err := client.Stats(ctx, &task.StatsRequest{ID: cid})
if err != nil {
fmt.Printf("index %d status failed: %s\n", i, err)
} else {
fmt.Printf("index %d cid: %s \n ", i, cid)
}
})
}
}()
}
wg.Wait()
}
//创建contaienrd 连接池
func mkContainerdConnPool(addr string, number int) []*containerd.Client {
var (
ss = make([]*containerd.Client, 0, number)
)
for i := 0; i < number; i++ {
ccli, err := containerd.New(addr, containerd.WithDefaultNamespace("k8s.io"))
if err != nil {
panic(err)
}
ss = append(ss, ccli)
}
return ss
}
// containerd 并发执行命令
func mkContainerdBatch(cc []*containerd.Client, batch int) {
var (
ctx = context.Background()
wg sync.WaitGroup
_, cancle = context.WithTimeout(ctx, time.Second*3)
)
defer cancle()
fmt.Println("start rpc call")
wg.Add(len(cc))
for _, v := range cc {
ccli := v
go func() {
defer wg.Done()
for i := 0; i < batch; i++ {
_, err := ccli.TaskService().Metrics(ctx, &tasks.MetricsRequest{})
if err != nil {
fmt.Printf("task Metrics failed: %s\n", err)
} else {
fmt.Printf("task Metrics ok on index %d\n", i)
}
}
}()
}
wg.Wait()
}
func containerdCommand(addr string, con, bat int) {
conns := mkContainerdConnPool(addr, con)
mkContainerdBatch(conns, bat)
}
func shimCommand(addr string, concurrency int, batch int, id string) {
connch := mkShimConnPool(addr, concurrency)
mkShimBatch(id, connch, batch)
}
func ebk(addr string, concurrency int) []events.EventsService {
var (
ss = make([]events.EventsService, 0, concurrency)
)
for i := 0; i < concurrency; i++ {
cli, err := ttrpcutil.NewClient(addr)
if err != nil {
panic(err)
}
es, err := cli.EventsService()
if err != nil {
panic(err)
}
ss = append(ss, es)
}
return ss
}
func mkstr(size int) string {
buf := bytes.NewBuffer(make([]byte, size))
for i := 0; i < size; i++ {
buf.WriteByte('a')
}
return buf.String()
}
func mkRandom(size int) map[string]string {
var (
keysize = 10
i = 0
ret = map[string]string{mkstr(keysize): mkstr(keysize)}
)
for {
half := size - (keysize * 2)
if half < 0 {
return ret
}
key := fmt.Sprintf("%s%d", mkstr(keysize), i)
ret[key] = key
i++
size = half
}
}
func ect(cc []events.EventsService, batch int, size int) {
var (
ctx = context.Background()
wg sync.WaitGroup
_, cancle = context.WithTimeout(ctx, time.Second*3)
)
defer cancle()
label := mkRandom(size)
// for k, v := range label {
// fmt.Printf("k:%s,v:%s\n", k, v)
// }
any, err := typeurl.MarshalAny(&eventstypes.NamespaceCreate{
Name: "foo",
Labels: label,
})
if err != nil {
panic(err)
}
req := &events.ForwardRequest{
Envelope: &events.Envelope{
Timestamp: time.Now(),
Namespace: "foo",
Topic: "/namespaces/create",
Event: any,
},
}
fmt.Println("start rpc call")
wg.Add(len(cc))
for _, v := range cc {
ccli := v
go func() {
defer wg.Done()
for i := 0; i < batch; i++ {
_, err := ccli.Forward(ctx, req)
if err != nil {
fmt.Printf("event forward failed: %s\n", err)
} else {
fmt.Printf("event forward ok on index %d\n", i)
}
}
}()
}
wg.Wait()
}
func eventCommand(addr string, concurrency int, batch int, size int) {
ss := ebk(addr, concurrency)
ect(ss, batch, size)
}
func Stats() {
rand.Seed(time.Now().Unix())
var wg sync.WaitGroup
var nodename = flag.String("n", "node-1:10250", "hostport to query")
var stats = flag.Int("l", 3, "list stats at once")
var cert = flag.String("c", "", "cert file")
var key = flag.String("k", "", "key file")
flag.Parse()
tlscert, err := tls.LoadX509KeyPair(*cert, *key)
if err != nil {
panic(err)
}
trans := http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{tlscert},
},
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
fmt.Printf("run call to stats %d times\n", *stats)
for i := 0; i < *stats; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := get(fmt.Sprintf("https://%s/stats/summary?only_cpu_and_memory=true", *nodename), &trans)
if err != nil {
log.Printf("didn't find stats for container %v", err)
}
}()
}
wg.Wait()
}
// https://stackoverflow.com/a/45766707/697126
func timing(what string) func() {
start := time.Now()
return func() {
fmt.Printf("%s took %v\n", what, time.Since(start))
}
}
func get(url string, cfg *http.Transport) (string, error) {
defer timing(fmt.Sprintf("timing: %s", url))()
http.DefaultTransport = cfg
response, err := http.Get(url)
if err != nil {
return "", err
}
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(contents), nil
}
func Shim() {
id := flag.String("id", "", "bundle path")
addr := flag.String("a", "", "bundle path")
con := flag.Int("c", 2, "concurrency number")
bat := flag.Int("b", 3, "batch call in one connect")
size := flag.Int("s", 1024, "batch call in one connect")
grpc := flag.Bool("g", false, "batch call in one connect")
flag.Parse()
cid = *id
fmt.Println(*con, *bat)
info, err := os.Stat(*addr)
if err != nil {
panic(err)
}
if info.IsDir() {
shimCommand(*addr, *con, *bat, *id)
return
} else {
if *grpc {
containerdCommand(*addr, *con, *bat)
} else {
eventCommand(*addr, *con, *bat, *size)
}
}
}
func tcpserver(stopch <-chan struct{}) {
ln, err := net.Listen("tcp", "")
if err != nil {
panic(err)
}
fmt.Println("listen address: ", ln.Addr().String())
connCh := make(chan net.Conn, 1)
go func(l net.Listener, connCh chan net.Conn) {
for {
conn, err := ln.Accept()
if err != nil {
fmt.Printf("accept failed: %v\n", err)
}
connCh <- conn
}
}(ln, connCh)
for {
select {
case <-stopch:
return
case conn := <-connCh:
fmt.Println("remote addr: ", conn.RemoteAddr())
}
}
}
func tcpclient(addr string, step int, stopch <-chan struct{}) {
conn, err := net.Dial("tcp", addr)
if err != nil {
panic(err)
}
data := make([]byte, step)
for i := 0; i < step; i++ {
data[i] = 'a'
}
errch := make(chan error, 1)
go func() {
i := 0
for {
count, err := conn.Write(data)
i++
fmt.Printf("write success,index %d byte length %d\n", i, count)
errch <- err
}
}()
for {
select {
case err := <-errch:
if err != nil {
fmt.Println("send failed: ", err)
}
case <-stopch:
return
}
}
}
func tcprun() {
s := flag.Bool("s", false, "run as server, default true")
c := flag.Bool("c", false, "run as clent, default false")
addr := flag.String("addr", "", "server address")
step := flag.Int("step", 1000, "data step, unit: B")
flag.Parse()
stopch := make(chan struct{})
if s != nil && *s {
tcpserver(stopch)
}
if c != nil && *c {
tcpclient(*addr, *step, stopch)
}
sigch := make(chan os.Signal, 2)
signal.Notify(sigch, []os.Signal{os.Interrupt, syscall.SIGTERM}...)
<-sigch
close(stopch)
}
func main() {
tcprun()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment