Created
September 7, 2020 20:55
-
-
Save fxposter/8887788a575090601ba6b106e80e3230 to your computer and use it in GitHub Desktop.
envoy control plane
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module example.com/envoy-cp | |
go 1.14 | |
require ( | |
github.com/envoyproxy/go-control-plane v0.9.5 | |
github.com/golang/protobuf v1.3.2 | |
google.golang.org/grpc v1.25.1 | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
api "github.com/envoyproxy/go-control-plane/envoy/api/v2" | |
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" | |
cachetypes "github.com/envoyproxy/go-control-plane/pkg/cache/types" | |
"github.com/envoyproxy/go-control-plane/pkg/cache/v2" | |
xds "github.com/envoyproxy/go-control-plane/pkg/server/v2" | |
"github.com/envoyproxy/go-control-plane/pkg/wellknown" | |
"github.com/golang/protobuf/proto" | |
"github.com/golang/protobuf/ptypes/any" | |
"github.com/golang/protobuf/ptypes/duration" | |
"github.com/golang/protobuf/ptypes/wrappers" | |
"golang.org/x/exp/rand" | |
"google.golang.org/grpc" | |
"log" | |
"net" | |
"time" | |
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" | |
cluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster" | |
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" | |
listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener" | |
route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" | |
hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" | |
endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" | |
) | |
type logger struct { | |
} | |
func (logger) Debugf(format string, args ...interface{}) { | |
//log.Printf(format, args...) | |
} | |
func (logger) Infof(format string, args ...interface{}) { | |
log.Printf(format, args...) | |
} | |
func (logger) Warnf(format string, args ...interface{}) { | |
log.Printf(format, args...) | |
} | |
func (logger) Errorf(format string, args ...interface{}) { | |
log.Printf(format, args...) | |
} | |
type callbacks struct { | |
} | |
func (c callbacks) OnStreamOpen(context.Context, int64, string) error { | |
return nil | |
} | |
func (c callbacks) OnStreamClosed(int64) { | |
} | |
func (c callbacks) OnStreamRequest(_ int64, req *v2.DiscoveryRequest) error { | |
log.Printf("[request] type: %v, version: %v, resources: %v", req.TypeUrl, req.VersionInfo, len(req.ResourceNames)) | |
return nil | |
} | |
func (c callbacks) OnStreamResponse(_ int64, req *v2.DiscoveryRequest, res *v2.DiscoveryResponse) { | |
log.Printf("[response] type: %v, request version: %v, response version: %v, resources: %v", req.TypeUrl, req.VersionInfo, res.VersionInfo, len(res.Resources)) | |
} | |
func (c callbacks) OnFetchRequest(context.Context, *v2.DiscoveryRequest) error { | |
return nil | |
} | |
func (c callbacks) OnFetchResponse(*v2.DiscoveryRequest, *v2.DiscoveryResponse) { | |
} | |
func main() { | |
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger{}) | |
server := xds.NewServer(context.Background(), snapshotCache, callbacks{}) | |
grpcServer := grpc.NewServer() | |
lis, _ := net.Listen("tcp", ":8080") | |
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, server) | |
api.RegisterEndpointDiscoveryServiceServer(grpcServer, server) | |
api.RegisterClusterDiscoveryServiceServer(grpcServer, server) | |
api.RegisterRouteDiscoveryServiceServer(grpcServer, server) | |
api.RegisterListenerDiscoveryServiceServer(grpcServer, server) | |
go func() { | |
if err := grpcServer.Serve(lis); err != nil { | |
// error handling | |
} | |
}() | |
var baseNames []int | |
for i := 0; i < 1000; i++ { | |
baseNames = append(baseNames, i) | |
} | |
for i := 0; ;i++ { | |
var additionalNames []int | |
for i := 0; i < 5; i++ { | |
additionalNames = append(additionalNames, int(rand.Int31() + 1000)) | |
} | |
additionalNames = append(additionalNames, baseNames...) | |
key := 0 | |
for _, i := range additionalNames { | |
key += i | |
} | |
snapshot := cache.Snapshot{} | |
snapshot.Resources[cachetypes.Listener] = cache.NewResources("1", listeners()) | |
snapshot.Resources[cachetypes.Route] = cache.NewResources(fmt.Sprintf("%d", key), routes(additionalNames)) | |
snapshot.Resources[cachetypes.Cluster] = cache.NewResources(fmt.Sprintf("%d", key), clusters(additionalNames)) | |
snapshot.Resources[cachetypes.Endpoint] = cache.NewResources(fmt.Sprintf("%d", key), endpoints(additionalNames)) | |
snapshotCache.SetSnapshot("router", snapshot) | |
if i == 0 { | |
time.Sleep(10 * time.Second) | |
} | |
time.Sleep(3*time.Second) | |
} | |
} | |
func BuildConfigSource() *core.ConfigSource { | |
return &core.ConfigSource{ | |
ConfigSourceSpecifier: &core.ConfigSource_Ads{ | |
Ads: &core.AggregatedConfigSource{}, | |
}, | |
} | |
} | |
func listeners() []cachetypes.Resource { | |
manager := &hcm.HttpConnectionManager{ | |
Tracing: &hcm.HttpConnectionManager_Tracing{}, | |
CodecType: hcm.HttpConnectionManager_AUTO, | |
HttpProtocolOptions: &core.Http1ProtocolOptions{ | |
AcceptHttp_10: true, | |
}, | |
CommonHttpProtocolOptions: &core.HttpProtocolOptions{ | |
}, | |
StatPrefix: "http", | |
RouteSpecifier: &hcm.HttpConnectionManager_Rds{ | |
Rds: &hcm.Rds{ | |
ConfigSource: BuildConfigSource(), | |
RouteConfigName: "route", | |
}, | |
}, | |
HttpFilters: []*hcm.HttpFilter{ | |
{ | |
Name: wellknown.Router, | |
}, | |
}, | |
} | |
return []cachetypes.Resource{ | |
&v2.Listener{ | |
Name: "listener", | |
TrafficDirection: core.TrafficDirection_OUTBOUND, | |
Address: &core.Address{ | |
Address: &core.Address_SocketAddress{ | |
SocketAddress: &core.SocketAddress{ | |
Protocol: core.SocketAddress_TCP, | |
Address: "0.0.0.0", | |
PortSpecifier: &core.SocketAddress_PortValue{ | |
PortValue: 8080, | |
}, | |
}, | |
}, | |
}, | |
FilterChains: []*listener.FilterChain{{ | |
Filters: []*listener.Filter{{ | |
Name: wellknown.HTTPConnectionManager, | |
ConfigType: &listener.Filter_TypedConfig{ | |
TypedConfig: ToAnyOrPanic(manager), | |
}, | |
}}, | |
}}, | |
}, | |
} | |
} | |
func clusters(names []int) []cachetypes.Resource { | |
var result []cachetypes.Resource | |
for _, i := range names { | |
result = append(result, buildCluster(fmt.Sprintf("egress_com__wixpress__kickstart-zero-to-prod-kseniia-mazan2_main-%d", i))) | |
} | |
return result | |
} | |
func buildCluster(name string) *v2.Cluster { | |
return &v2.Cluster{ | |
Name: name, | |
ConnectTimeout: &duration.Duration{Seconds: 5}, | |
ClusterDiscoveryType: &v2.Cluster_Type{ | |
Type: v2.Cluster_EDS, | |
}, | |
ProtocolSelection: v2.Cluster_USE_DOWNSTREAM_PROTOCOL, | |
CommonHttpProtocolOptions: &core.HttpProtocolOptions{ | |
IdleTimeout: &duration.Duration{Seconds: 5}, | |
}, | |
HttpProtocolOptions: &core.Http1ProtocolOptions{ | |
AcceptHttp_10: true, | |
}, | |
Http2ProtocolOptions: &core.Http2ProtocolOptions{}, | |
MaxRequestsPerConnection: &wrappers.UInt32Value{ | |
Value: 1000, // Probably we need to move it to some settings | |
}, | |
CircuitBreakers: &cluster.CircuitBreakers{ | |
Thresholds: []*cluster.CircuitBreakers_Thresholds{{ | |
Priority: core.RoutingPriority_DEFAULT, | |
MaxConnections: &wrappers.UInt32Value{Value: 2048}, | |
MaxPendingRequests: &wrappers.UInt32Value{Value: 2048}, | |
MaxRequests: &wrappers.UInt32Value{Value: 2048}, | |
MaxRetries: &wrappers.UInt32Value{Value: 60}, | |
}, { | |
Priority: core.RoutingPriority_HIGH, | |
MaxConnections: &wrappers.UInt32Value{Value: 2048}, | |
MaxPendingRequests: &wrappers.UInt32Value{Value: 2048}, | |
MaxRequests: &wrappers.UInt32Value{Value: 2048}, | |
MaxRetries: &wrappers.UInt32Value{Value: 60}, | |
}}, | |
}, | |
UpstreamConnectionOptions: &v2.UpstreamConnectionOptions{ | |
TcpKeepalive: &core.TcpKeepalive{ | |
KeepaliveProbes: &wrappers.UInt32Value{ | |
Value: 2, | |
}, | |
KeepaliveInterval: &wrappers.UInt32Value{ | |
Value: 30, | |
}, | |
}, | |
}, | |
OutlierDetection: &cluster.OutlierDetection{ | |
SuccessRateMinimumHosts: &wrappers.UInt32Value{Value: 2}, | |
EnforcingConsecutiveGatewayFailure: &wrappers.UInt32Value{Value: 100}, | |
}, | |
EdsClusterConfig: &v2.Cluster_EdsClusterConfig{ | |
EdsConfig: BuildConfigSource(), | |
}, | |
} | |
} | |
func endpoints(names []int) []cachetypes.Resource { | |
var result []cachetypes.Resource | |
for _, i := range names { | |
e := &v2.ClusterLoadAssignment{ | |
ClusterName: fmt.Sprintf("egress_com__wixpress__kickstart-zero-to-prod-kseniia-mazan2_main-%d", i), | |
Endpoints: []*endpoint.LocalityLbEndpoints{ | |
{ | |
Locality: &core.Locality{Region: "us-east-1",}, | |
LbEndpoints: []*endpoint.LbEndpoint{ | |
//{ | |
// HostIdentifier: &endpoint.LbEndpoint_Endpoint{ | |
// Endpoint: &endpoint.Endpoint{ | |
// Address: &core.Address{ | |
// Address: &core.Address_SocketAddress{ | |
// SocketAddress: &core.SocketAddress{ | |
// Protocol: core.SocketAddress_TCP, | |
// Address: "127.0.0.1", | |
// PortSpecifier: &core.SocketAddress_PortValue{ | |
// PortValue: 9090, | |
// }, | |
// }, | |
// }, | |
// }, | |
// }, | |
// }, | |
// HealthStatus: core.HealthStatus_HEALTHY, | |
//}, | |
}, | |
Priority: 0, | |
}, | |
}, | |
} | |
result = append(result, e) | |
} | |
return result | |
} | |
func routes(names []int) []cachetypes.Resource { | |
var rr []*route.Route | |
var vhh []*route.VirtualHost | |
for _, i := range names { | |
r := &route.Route{ | |
Match: &route.RouteMatch{ | |
PathSpecifier: &route.RouteMatch_Prefix{ | |
Prefix: fmt.Sprintf("/route-%d", i), | |
}, | |
}, | |
Action: &route.Route_Route{ | |
Route: &route.RouteAction{ | |
PrefixRewrite: fmt.Sprintf("/cluster-%d", i), | |
ClusterSpecifier: &route.RouteAction_Cluster{ | |
Cluster: fmt.Sprintf("egress_com__wixpress__kickstart-zero-to-prod-kseniia-mazan2_main-%d", i), | |
}, | |
RetryPolicy: &route.RetryPolicy{ | |
RetryOn: "retriable-status-codes,refused-stream,connect-failure,unavailable", | |
NumRetries: &wrappers.UInt32Value{Value: 5}, | |
RetriableStatusCodes: []uint32{503, 2, 14}, | |
RetryHostPredicate: []*route.RetryPolicy_RetryHostPredicate{ | |
{ | |
Name: "envoy.retry_host_predicates.previous_hosts", | |
}, | |
}, | |
HostSelectionRetryMaxAttempts: 4, | |
}, | |
}, | |
}, | |
} | |
rr = append(rr, r) | |
vh := &route.VirtualHost{ | |
Name: fmt.Sprintf("vh-%d", i), | |
Domains: []string{fmt.Sprintf("domain-%d", i)}, | |
Routes: []*route.Route{r}, | |
} | |
vhh = append(vhh, vh) | |
} | |
vhh = append(vhh, &route.VirtualHost{ | |
Name: "wildcard-common-ingress", | |
Domains: []string{"*"}, | |
Routes: rr, | |
}) | |
routeConfiguration := &v2.RouteConfiguration{ | |
Name: "route", | |
VirtualHosts: vhh, | |
} | |
return []cachetypes.Resource{routeConfiguration} | |
} | |
func ToAnyOrPanic(message proto.Message) *any.Any { | |
value, err := cache.MarshalResource(message) | |
if err != nil { | |
panic(err) | |
} | |
return &any.Any{TypeUrl: "type.googleapis.com/" + proto.MessageName(message), Value: value} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment