Skip to content

Instantly share code, notes, and snippets.

@yin1999
Last active April 12, 2024 02:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yin1999/7d168865300b5dfe374b0791152cff2c to your computer and use it in GitHub Desktop.
Save yin1999/7d168865300b5dfe374b0791152cff2c to your computer and use it in GitHub Desktop.
package main
import (
"encoding/binary"
"fmt"
"log"
stdlog "log"
"math/rand"
"net"
"os"
"strings"
// import the resources to register them
_ "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
_ "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
_ "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
_ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
"github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest/unversioned"
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.uber.org/multierr"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/known/anypb"
rest_v1alpha1 "github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest/v1alpha1"
dubbo_log "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/tools/xds-client/stream"
)
func newRootCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "dubbo-xds-client",
Short: "dubbo xDS client",
Long: `dubbo xDS client.`,
PersistentPreRun: func(_ *cobra.Command, _ []string) {
core.SetLogger(core.NewLogger(dubbo_log.DebugLevel))
},
}
// sub-commands
cmd.AddCommand(newRunCmd())
return cmd
}
func newRunCmd() *cobra.Command {
log := core.Log.WithName("dubbo-xds-client").WithName("run")
args := struct {
xdsServerAddress string
dps int
services int
inbounds int
outbounds int
}{
xdsServerAddress: "grpc://localhost:5678",
dps: 1,
services: 1,
inbounds: 1,
outbounds: 1,
}
cmd := &cobra.Command{
Use: "run",
Short: "Start xDS client(s) that simulate Envoy",
Long: `Start xDS client(s) that simulate Envoy.`,
RunE: func(cmd *cobra.Command, _ []string) error {
ipRand := rand.Uint32() // #nosec G404 -- that's just a test tool
log.Info("going to start xDS clients (Envoy simulators)", "dps", args.dps)
errCh := make(chan error, 1)
for i := 0; i < args.dps; i++ {
namespace := "dubbo-system"
name := "dubbo-samples-apiserver-consumer-59c775d8fd-wbdz8"
mesh := "default"
name = name + "." + namespace
id := mesh + "." + name
nodeLog := log.WithName("envoy-simulator").WithValues("idx", i, "ID", id)
nodeLog.Info("creating an xDS client ...")
go func(i int) {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, ipRand+uint32(i))
ip := net.IP(buf).String()
dpSpec := &v1alpha1.Dataplane{
Networking: &v1alpha1.Dataplane_Networking{
Address: ip,
},
}
dp := &unversioned.Resource{
Meta: rest_v1alpha1.ResourceMeta{Mesh: mesh, Name: name, Type: "Dataplane"},
Spec: dpSpec,
}
errCh <- func() (errs error) {
client, err := stream.New(args.xdsServerAddress)
if err != nil {
return errors.Wrap(err, "failed to connect to xDS server")
}
defer func() {
nodeLog.Info("closing a connection ...")
if err := client.Close(); err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to close a connection"))
}
}()
nodeLog.Info("opening an xDS stream ...")
stream, err := client.StartStream()
if err != nil {
return errors.Wrap(err, "failed to start an xDS stream")
}
defer func() {
nodeLog.Info("closing an xDS stream ...")
if err := stream.Close(); err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to close an xDS stream"))
}
}()
nodeLog.Info("requesting Listeners")
e := stream.Request(id, envoy_resource.ListenerType, dp)
if e != nil {
return errors.Wrapf(e, "failed to request %q", envoy_resource.ListenerType)
}
nodeLog.Info("requesting Clusters")
e = stream.Request(id, envoy_resource.ClusterType, dp)
if e != nil {
return errors.Wrapf(e, "failed to request %q", envoy_resource.ClusterType)
}
nodeLog.Info("requesting Endpoints")
e = stream.Request(id, envoy_resource.EndpointType, dp)
if e != nil {
return errors.Wrapf(e, "failed to request %q", envoy_resource.EndpointType)
}
// nodeLog.Info("endpoint resources:\n", formatResources(dp.GetSpec().))
for {
nodeLog.Info("waiting for a discovery response ...")
resp, err := stream.WaitForResources()
if err != nil {
return errors.Wrap(err, "failed to receive a discovery response")
}
stdlog.Print("received xDS resources ", " type ", resp.TypeUrl, " version ", resp.VersionInfo, " nonce ", resp.Nonce, " resources ", len(resp.Resources), " resources: ", formatResources(resp.Resources))
if err := stream.ACK(resp.TypeUrl); err != nil {
return errors.Wrap(err, "failed to ACK a discovery response")
}
nodeLog.Info("ACKed discovery response", "type", resp.TypeUrl, "version", resp.VersionInfo, "nonce", resp.Nonce)
}
}()
}(i)
}
err := <-errCh
return errors.Wrap(err, "one of xDS clients (Envoy simulators) terminated with an error")
},
}
// flags
cmd.PersistentFlags().StringVar(&args.xdsServerAddress, "xds-server-address", args.xdsServerAddress, "address of xDS server")
cmd.PersistentFlags().IntVar(&args.dps, "dps", args.dps, "number of dataplanes to emulate")
cmd.PersistentFlags().IntVar(&args.services, "services", args.services, "number of services")
cmd.PersistentFlags().IntVar(&args.inbounds, "inbounds", args.inbounds, "number of inbounds")
cmd.PersistentFlags().IntVar(&args.outbounds, "outbounds", args.outbounds, "number of outbounds")
return cmd
}
func formatResources(resources []*anypb.Any) string {
result := make([]string, 0, len(resources))
for _, r := range resources {
// check the type of the resource is registered
tp, err := protoregistry.GlobalTypes.FindMessageByURL(r.GetTypeUrl())
if err != nil {
log.Printf("failed to find the type of the resource (%s): %v", r.GetTypeUrl(), err)
continue
}
// unmarshal the resource
msg := tp.New().Interface()
if err := anypb.UnmarshalTo(r, msg, proto.UnmarshalOptions{}); err != nil {
log.Printf("failed to unmarshal the resource: %v", err)
continue
}
// format the resource
result = append(result, protojson.Format(msg))
}
return strings.Join(result, " | ")
}
func main() {
if err := newRootCmd().Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment