Skip to content

Instantly share code, notes, and snippets.

@fxposter
Created September 7, 2020 20:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fxposter/8887788a575090601ba6b106e80e3230 to your computer and use it in GitHub Desktop.
Save fxposter/8887788a575090601ba6b106e80e3230 to your computer and use it in GitHub Desktop.
envoy control plane
module example.com/envoy-cp
go 1.14
require (
github.com/envoyproxy/go-control-plane v0.9.5
github.com/golang/protobuf v1.3.2
google.golang.org/grpc v1.25.1
)
package main
import (
"context"
"fmt"
api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
cachetypes "github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
xds "github.com/envoyproxy/go-control-plane/pkg/server/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/duration"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/exp/rand"
"google.golang.org/grpc"
"log"
"net"
"time"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
cluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener"
route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
)
type logger struct {
}
func (logger) Debugf(format string, args ...interface{}) {
//log.Printf(format, args...)
}
func (logger) Infof(format string, args ...interface{}) {
log.Printf(format, args...)
}
func (logger) Warnf(format string, args ...interface{}) {
log.Printf(format, args...)
}
func (logger) Errorf(format string, args ...interface{}) {
log.Printf(format, args...)
}
type callbacks struct {
}
func (c callbacks) OnStreamOpen(context.Context, int64, string) error {
return nil
}
func (c callbacks) OnStreamClosed(int64) {
}
func (c callbacks) OnStreamRequest(_ int64, req *v2.DiscoveryRequest) error {
log.Printf("[request] type: %v, version: %v, resources: %v", req.TypeUrl, req.VersionInfo, len(req.ResourceNames))
return nil
}
func (c callbacks) OnStreamResponse(_ int64, req *v2.DiscoveryRequest, res *v2.DiscoveryResponse) {
log.Printf("[response] type: %v, request version: %v, response version: %v, resources: %v", req.TypeUrl, req.VersionInfo, res.VersionInfo, len(res.Resources))
}
func (c callbacks) OnFetchRequest(context.Context, *v2.DiscoveryRequest) error {
return nil
}
func (c callbacks) OnFetchResponse(*v2.DiscoveryRequest, *v2.DiscoveryResponse) {
}
func main() {
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger{})
server := xds.NewServer(context.Background(), snapshotCache, callbacks{})
grpcServer := grpc.NewServer()
lis, _ := net.Listen("tcp", ":8080")
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
api.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
api.RegisterClusterDiscoveryServiceServer(grpcServer, server)
api.RegisterRouteDiscoveryServiceServer(grpcServer, server)
api.RegisterListenerDiscoveryServiceServer(grpcServer, server)
go func() {
if err := grpcServer.Serve(lis); err != nil {
// error handling
}
}()
var baseNames []int
for i := 0; i < 1000; i++ {
baseNames = append(baseNames, i)
}
for i := 0; ;i++ {
var additionalNames []int
for i := 0; i < 5; i++ {
additionalNames = append(additionalNames, int(rand.Int31() + 1000))
}
additionalNames = append(additionalNames, baseNames...)
key := 0
for _, i := range additionalNames {
key += i
}
snapshot := cache.Snapshot{}
snapshot.Resources[cachetypes.Listener] = cache.NewResources("1", listeners())
snapshot.Resources[cachetypes.Route] = cache.NewResources(fmt.Sprintf("%d", key), routes(additionalNames))
snapshot.Resources[cachetypes.Cluster] = cache.NewResources(fmt.Sprintf("%d", key), clusters(additionalNames))
snapshot.Resources[cachetypes.Endpoint] = cache.NewResources(fmt.Sprintf("%d", key), endpoints(additionalNames))
snapshotCache.SetSnapshot("router", snapshot)
if i == 0 {
time.Sleep(10 * time.Second)
}
time.Sleep(3*time.Second)
}
}
func BuildConfigSource() *core.ConfigSource {
return &core.ConfigSource{
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
}
}
func listeners() []cachetypes.Resource {
manager := &hcm.HttpConnectionManager{
Tracing: &hcm.HttpConnectionManager_Tracing{},
CodecType: hcm.HttpConnectionManager_AUTO,
HttpProtocolOptions: &core.Http1ProtocolOptions{
AcceptHttp_10: true,
},
CommonHttpProtocolOptions: &core.HttpProtocolOptions{
},
StatPrefix: "http",
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
Rds: &hcm.Rds{
ConfigSource: BuildConfigSource(),
RouteConfigName: "route",
},
},
HttpFilters: []*hcm.HttpFilter{
{
Name: wellknown.Router,
},
},
}
return []cachetypes.Resource{
&v2.Listener{
Name: "listener",
TrafficDirection: core.TrafficDirection_OUTBOUND,
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: "0.0.0.0",
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: 8080,
},
},
},
},
FilterChains: []*listener.FilterChain{{
Filters: []*listener.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: ToAnyOrPanic(manager),
},
}},
}},
},
}
}
func clusters(names []int) []cachetypes.Resource {
var result []cachetypes.Resource
for _, i := range names {
result = append(result, buildCluster(fmt.Sprintf("egress_com__wixpress__kickstart-zero-to-prod-kseniia-mazan2_main-%d", i)))
}
return result
}
func buildCluster(name string) *v2.Cluster {
return &v2.Cluster{
Name: name,
ConnectTimeout: &duration.Duration{Seconds: 5},
ClusterDiscoveryType: &v2.Cluster_Type{
Type: v2.Cluster_EDS,
},
ProtocolSelection: v2.Cluster_USE_DOWNSTREAM_PROTOCOL,
CommonHttpProtocolOptions: &core.HttpProtocolOptions{
IdleTimeout: &duration.Duration{Seconds: 5},
},
HttpProtocolOptions: &core.Http1ProtocolOptions{
AcceptHttp_10: true,
},
Http2ProtocolOptions: &core.Http2ProtocolOptions{},
MaxRequestsPerConnection: &wrappers.UInt32Value{
Value: 1000, // Probably we need to move it to some settings
},
CircuitBreakers: &cluster.CircuitBreakers{
Thresholds: []*cluster.CircuitBreakers_Thresholds{{
Priority: core.RoutingPriority_DEFAULT,
MaxConnections: &wrappers.UInt32Value{Value: 2048},
MaxPendingRequests: &wrappers.UInt32Value{Value: 2048},
MaxRequests: &wrappers.UInt32Value{Value: 2048},
MaxRetries: &wrappers.UInt32Value{Value: 60},
}, {
Priority: core.RoutingPriority_HIGH,
MaxConnections: &wrappers.UInt32Value{Value: 2048},
MaxPendingRequests: &wrappers.UInt32Value{Value: 2048},
MaxRequests: &wrappers.UInt32Value{Value: 2048},
MaxRetries: &wrappers.UInt32Value{Value: 60},
}},
},
UpstreamConnectionOptions: &v2.UpstreamConnectionOptions{
TcpKeepalive: &core.TcpKeepalive{
KeepaliveProbes: &wrappers.UInt32Value{
Value: 2,
},
KeepaliveInterval: &wrappers.UInt32Value{
Value: 30,
},
},
},
OutlierDetection: &cluster.OutlierDetection{
SuccessRateMinimumHosts: &wrappers.UInt32Value{Value: 2},
EnforcingConsecutiveGatewayFailure: &wrappers.UInt32Value{Value: 100},
},
EdsClusterConfig: &v2.Cluster_EdsClusterConfig{
EdsConfig: BuildConfigSource(),
},
}
}
func endpoints(names []int) []cachetypes.Resource {
var result []cachetypes.Resource
for _, i := range names {
e := &v2.ClusterLoadAssignment{
ClusterName: fmt.Sprintf("egress_com__wixpress__kickstart-zero-to-prod-kseniia-mazan2_main-%d", i),
Endpoints: []*endpoint.LocalityLbEndpoints{
{
Locality: &core.Locality{Region: "us-east-1",},
LbEndpoints: []*endpoint.LbEndpoint{
//{
// HostIdentifier: &endpoint.LbEndpoint_Endpoint{
// Endpoint: &endpoint.Endpoint{
// Address: &core.Address{
// Address: &core.Address_SocketAddress{
// SocketAddress: &core.SocketAddress{
// Protocol: core.SocketAddress_TCP,
// Address: "127.0.0.1",
// PortSpecifier: &core.SocketAddress_PortValue{
// PortValue: 9090,
// },
// },
// },
// },
// },
// },
// HealthStatus: core.HealthStatus_HEALTHY,
//},
},
Priority: 0,
},
},
}
result = append(result, e)
}
return result
}
func routes(names []int) []cachetypes.Resource {
var rr []*route.Route
var vhh []*route.VirtualHost
for _, i := range names {
r := &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: fmt.Sprintf("/route-%d", i),
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
PrefixRewrite: fmt.Sprintf("/cluster-%d", i),
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: fmt.Sprintf("egress_com__wixpress__kickstart-zero-to-prod-kseniia-mazan2_main-%d", i),
},
RetryPolicy: &route.RetryPolicy{
RetryOn: "retriable-status-codes,refused-stream,connect-failure,unavailable",
NumRetries: &wrappers.UInt32Value{Value: 5},
RetriableStatusCodes: []uint32{503, 2, 14},
RetryHostPredicate: []*route.RetryPolicy_RetryHostPredicate{
{
Name: "envoy.retry_host_predicates.previous_hosts",
},
},
HostSelectionRetryMaxAttempts: 4,
},
},
},
}
rr = append(rr, r)
vh := &route.VirtualHost{
Name: fmt.Sprintf("vh-%d", i),
Domains: []string{fmt.Sprintf("domain-%d", i)},
Routes: []*route.Route{r},
}
vhh = append(vhh, vh)
}
vhh = append(vhh, &route.VirtualHost{
Name: "wildcard-common-ingress",
Domains: []string{"*"},
Routes: rr,
})
routeConfiguration := &v2.RouteConfiguration{
Name: "route",
VirtualHosts: vhh,
}
return []cachetypes.Resource{routeConfiguration}
}
func ToAnyOrPanic(message proto.Message) *any.Any {
value, err := cache.MarshalResource(message)
if err != nil {
panic(err)
}
return &any.Any{TypeUrl: "type.googleapis.com/" + proto.MessageName(message), Value: value}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment