Skip to content

Instantly share code, notes, and snippets.

@alecholmez
Last active October 12, 2021 17:21
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alecholmez/fa34dbe94b278defb9c2e46b40acb044 to your computer and use it in GitHub Desktop.
Save alecholmez/fa34dbe94b278defb9c2e46b40acb044 to your computer and use it in GitHub Desktop.
Incremental xDS Implementation Plan

Incremental xDS Implementation Plan

  • Authors: Alec Holmes
  • Reviewers: (names here)
  • Status: Planning

Contents

Control Plane

Introduction

Envoy's envoyproxy/go-control-plane has seen a lot of traction in the previous year. Rolling out Envoy 1.13, a new xDS feature was introduced that remedied performance issues when running the Envoy discovery services at large scale (100K+ configuration items). Deployments can grow quickly and sending complete "State of the World (SotW" configuration updates takes a huge hit when at scale. Along comes Incremental xDS (Delta xDS) which solves the issue of requiring full configuration broadcasting to Envoy sidecars. Proxies now have the ability to receive changes based upon previous deltas in the config, or changes to requested resources that Envoy's have subscribed to.

Motivation

This feature has not yet been adopted nor implemented in the open source yet. The need for a performant control plane at scale is larger than ever. Implementing Incremental xDS will provide a remedy to those deployments that require fast discovery and adaptability. This change belongs in go-control-plane because all projects that require this base as a dependency will now inherit this functionality and also keep the backwards compatibility of the SotW xDS mechanisms.

Snapshot Cache

Resource Name Requesting

Envoy's now have the ability to subscribe to resources that should be watched. The cache which holds snapshots for configuration resources will need the ability to fetch resources names/aliases that should be retrieved from the cache.

// GetSubcribedResources selects requested snapshot resources by type and alias.
// This function is used for Incremental/Delta xDS and follows the subscribed resource model.
func (s *Snapshot) GetSubscribedResources(aliases []string, typeURL string) map[string]Resource {
	if s == nil {
		return nil
	}

	t := GetResponseType(typeURL)
	if t == UnknownType {
		return nil
	}

	subscribed := make(map[string]Resource, len(aliases))

	r := s.Resources[t].Items
	// This right now is O(n^2) which is not performant. Will need to revisit
	for _, item := range r {
		for _, alias := range aliases {
			if GetResourceName(item) == alias {
				subscribed[alias] = item
			}
		}
	}

	return subscribed
}
  • Create deletion functions for delta xDS in the snapshot map

Delta Discovery Request/Response

Incremental xDS uses different Discovery Request/Response objects from the pre-existing discovery mechanisms. Go-control-plane will need to build it's new logic around those new defined structures while also keeping backwards compatibility. We can mimic logic with minimal modification but some duplication will be needed.

func createDeltaResponse(request DeltaRequest, resources map[string]Resource, version string) DeltaResponse {
	filtered := make([]Resource, 0, len(resources))

	// Reply only with the requested resources. Envoy may ask each resource
	// individually in a separate stream. It is ok to reply with the same version
	// on separate streams since requests do not share their response versions.
	if len(request.ResourceNamesSubscribe) != 0 {
		set := nameSet(request.ResourceNamesSubscribe)
		for name, resource := range resources {
			if set[name] {
				filtered = append(filtered, resource)
			}
		}
	} else {
		for _, resource := range resources {
			filtered = append(filtered, resource)
		}
	}

	return DeltaResponse{
		DeltaRequest: request,
		Version:      version,
		Resources:    filtered,
	}
}

// FetchDelta
func (cache *snapshotCache) FetchDelta(ctx context.Context, request DeltaRequest) (*DeltaResponse, error) {
	nodeID := cache.hash.ID(request.Node)

	cache.mu.RLock()
	defer cache.mu.RUnlock()

	if snapshot, exists := cache.snapshots[nodeID]; exists {
		// Respond only if the request version is distinct from the current snapshot state.
		// It might be beneficial to hold the request since Envoy will re-attempt the refresh.

		// We don't need to check the version in a Delta request/response cycle
		version := snapshot.GetVersion(request.TypeUrl)

		// Need to add support for requesting fetched resources based on what the client subscribed too
		resources := snapshot.GetSubscribedResources(request.GetResourceNamesSubscribe(), request.TypeUrl)

		out := createDeltaResponse(request, resources, version)
		return &out, nil
	}

	return nil, fmt.Errorf("missing snapshot for %q", nodeID)
}

Cache Request/Response Objects

Go-control-plane has wrappers for Envoy's defined DiscoveryRequest and DiscoveryResponse objects. New wrappers will be needed for the Delta counterparts:

Request

// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = v2.DeltaDiscoveryRequest

Response

// DeltaResponse is a pre-serialized xDS response.
type DeltaResponse struct {
	// Request is the original request.
	DeltaRequest v2.DeltaDiscoveryRequest

	// Version of the resources as tracked by the cache for the given type.
	// Proxy responds with this version as an acknowledgement.
	Version string

	// The value indicating whether the resource is marshaled, and only one of `Resources` and `MarshaledResources` is available.
	ResourceMarshaled bool

	// Resources to be included in the response.
	Resources []Resource

	// Marshaled Resources to be included in the response.
	MarshaledResources []MarshaledResource
}

These were built off the previous Request/Response wrappers in go-control-plane but their embedded Envoy object has been changed

Server Implementation

Revised Callback Interface

New callback methods are needed for support with DeltaDiscoveryRequests/Responses. We will need backwards compatibility with pre-existing StoW xDS logic so modifying existing callback methods wouldn't be suitable for the introduction of this new capability. An example implementation could be the following:

// Callbacks is a collection of callbacks inserted into the server operation.
// The callbacks are invoked synchronously.
type Callbacks interface {
	// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
	// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
	OnStreamOpen(context.Context, int64, string) error
	// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
	OnStreamClosed(int64)
	// OnStreamRequest is called once a request is received on a stream.
	// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
	OnStreamRequest(int64, *v2.DiscoveryRequest) error
	// OnStreamResponse is called immediately prior to sending a response on a stream.
	OnStreamResponse(int64, *v2.DiscoveryRequest, *v2.DiscoveryResponse)

	// Incremental xDS only supports gRPC streams 
	OnStreamDeltaRequest(int64, *v2.DeltaDiscoveryRequest) error
	OnStreamDeltaResponse(int64, *v2.DeltaDiscoveryRequest, *v2.DeltaDiscoveryResponse)

	// OnFetchRequest is called for each Fetch request. Returning an error will end processing of the
	// request and respond with an error.
	OnFetchRequest(context.Context, *v2.DiscoveryRequest) error
	// OnFetchResponse is called immediately prior to sending a response.
	OnFetchResponse(*v2.DiscoveryRequest, *v2.DiscoveryResponse)
}

You'll want to note the Delta callback methods with the alternative Envoy *v2.DeltaDiscoveryRequest/Response

Revised Handler/Processor Mechanism

Go-control-plane has universal handler() and process() methods that create watches on specific Envoy data types. A mirror of this functionality with a Delta handler and processor will create watches for specific request resources on the subscribed resource list:

// deltaHandler converts a blocking read call to channels and initiates stream processing for incremental changes
func (s *server) deltaHandler(stream stream, typeURL string) error {
	// a channel for receiving incoming requests
	reqCh := make(chan *v2.DeltaDiscoveryRequest)
	reqStop := int32(0)
	go func() {
		for {
			req, err := stream.Recv()
			if atomic.LoadInt32(&reqStop) != 0 {
				return
			}
			if err != nil {
				close(reqCh)
				return
			}
			reqCh <- req
		}
	}()

	// Data processing and subscrition parsing will take place in here
	err := s.deltaProcess(stream, reqCh, typeURL)

	// prevents writing to a closed channel if send failed on blocked recv
	// TODO(kuat) figure out how to unblock recv through gRPC API
	atomic.StoreInt32(&reqStop, 1)

	return err
}

xDS Server Function Implementation

The same pattern for previously implemented gRPC server functions will be followed. All Delta* server functions that are not implemented will need updating to flow into the handler/processor which handles watch generation.

func (s *server) DeltaAggregatedResources(_ discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
	return s.deltaHandler.processDelta(stream, cache.AnyType)
}

func (s *server) DeltaEndpoints(_ v2grpc.EndpointDiscoveryService_DeltaEndpointsServer) error {
	return s.deltaHandler.processDelta(stream, cache.EndpointType)
}

func (s *server) DeltaClusters(_ v2grpc.ClusterDiscoveryService_DeltaClustersServer) error {
	return s.deltaHandler.processDelta(stream, cache.ClusterType)
}

func (s *server) DeltaRoutes(_ v2grpc.RouteDiscoveryService_DeltaRoutesServer) error {
	return s.deltaHandler.processDelta(stream, cache.RouteType)
}

func (s *server) DeltaListeners(_ v2grpc.ListenerDiscoveryService_DeltaListenersServer) error {
	return s.deltaHandler.processDelta(stream, cache.ListenerType)
}

func (s *server) DeltaSecrets(_ discoverygrpc.SecretDiscoveryService_DeltaSecretsServer) error {
	return s.deltaHandler.processDelta(stream, cache.SecretType)
}

func (s *server) DeltaRuntime(_ discoverygrpc.RuntimeDiscoveryService_DeltaRuntimeServer) error {
	return s.deltaHandler.processDelta(stream, cache.RuntimeType)
}

Diffing Algorithm

  • Management server needs support for broadcasting updates when a delta is found in resources
  • delta diffing algorithm implementation (needs to be fast for scale)

Sub/UnSub Mechanism

Incremental xDS introduces the ability for the management server to spontaneously send updates to Envoy's when changes are detected in the snapshot cache. This removes the need for Envoy's to continually request resource types for the state of a mesh. To achieve this functionality, Envoy's have the ability to subscribe to resources to watch, which means whenever a change is detected on those resources, the management server can send out those new updates to all Envoy's subscribed to that specific resource by name or alias.

Creating Delta Watches

All Incremental xDS streams will need watches created when new resources are subscribed too. During the subscription process, the resource aliases will receive individual delta watches that are monitored for changes/failures. When a failure occurs, a watch can be killed and re-requested to bring the Envoy resource stream up to sync with the management server.

func (cache *snapshotCache) CreateDeltaWatch(request DeltaRequest) (chan DeltaResponse, func()) {
	nodeID := cache.hash.ID(request.Node)

	cache.mu.Lock()
	defer cache.mu.Unlock()

	info, ok := cache.status[nodeID]
	if !ok {
		info = newStatusInfo(request.Node)
		cache.status[nodeID] = info
	}

	// update last watch request time
	info.mu.Lock()
	info.lastWatchRequestTime = time.Now()
	info.mu.Unlock()

	// allocate capacity 1 to allow one-time non-blocking use
	value := make(chan DeltaResponse, 1)

	snapshot, exists := cache.snapshots[nodeID]
	version := snapshot.GetVersion(request.TypeUrl)

	// if the requested version is up-to-date or missing a response, leave an open watch
	if !exists || request.VersionInfo == version {
		watchID := cache.nextWatchID()
		if cache.log != nil {
			cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID,
				request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)
		}
		info.mu.Lock()
		info.watches[watchID] = ResponseWatch{Request: request, Response: value}
		info.mu.Unlock()
		return value, cache.cancelWatch(nodeID, watchID)
	}

	// otherwise, the watch may be responded immediately
	cache.respond(request, value, snapshot.GetResources(request.TypeUrl), version)

	return value, nil
}

Subscribed Resource Response

When a change is detected in the *snapshotCache, a response will be sent from the management server to all Envoy's subscribed to the watch that detected a change.

// Respond to a watch with the snapshot value. The value channel should have capacity not to block.
func (cache *snapshotCache) respondDelta(request DeltaRequest, value chan DeltaResponse, resources map[string]Resource, version string) {
	// for ADS, the request names must match the snapshot names
	// if they do not, then the watch is never responded, and it is expected that envoy makes another request
	if len(request.ResourceNames) != 0 && cache.ads {
		if err := superset(nameSet(request.ResourceNames), resources); err != nil {
			if cache.log != nil {
				cache.log.Debugf("ADS mode: not responding to request: %v", err)
			}
			return
		}
	}
	if cache.log != nil {
		cache.log.Debugf("respond %s%v version %q with version %q",
			request.TypeUrl, request.ResourceNames, request.VersionInfo, version)
	}

	value <- createDeltaResponse(request, resources, version)
}

Unsubscribing On A Resource

Once Envoy's unsubscribe from a resource, there no longer is a need to hold a go routine with a watch on the resource. The watch can be killed for that Envoy but the stream should stay open for other resources.

switch {
case req.TypeUrl == cache.EndpointType && (values.endpointNonce == "" || values.endpointNonce == nonce):
	if values.endpointCancel != nil {
		values.endpointCancel()
	}
	values.endpoints, values.endpointCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.ClusterType && (values.clusterNonce == "" || values.clusterNonce == nonce):
	if values.clusterCancel != nil {
		values.clusterCancel()
	}
	values.clusters, values.clusterCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.RouteType && (values.routeNonce == "" || values.routeNonce == nonce):
	if values.routeCancel != nil {
		values.routeCancel()
	}
	values.routes, values.routeCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.ListenerType && (values.listenerNonce == "" || values.listenerNonce == nonce):
	if values.listenerCancel != nil {
		values.listenerCancel()
	}
	values.listeners, values.listenerCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.SecretType && (values.secretNonce == "" || values.secretNonce == nonce):
	if values.secretCancel != nil {
		values.secretCancel()
	}
	values.secrets, values.secretCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.RuntimeType && (values.runtimeNonce == "" || values.runtimeNonce == nonce):
	if values.runtimeCancel != nil {
		values.runtimeCancel()
	}
	values.runtimes, values.runtimeCancel = s.cache.CreateWatch(*req)
}

Failure Scenarios

Configuration Conflict

When the go-control-plane SotW xDS server encounters an invalid snapshot, in its current implementation, it will continuously push out the invalid version and enter a rejection loop with Envoy. When a new configuration is received, go-control-plane bumps the version and broadcasts the new items over the pre-existing xDS stream.

2 Possible Failure Scenario Implementations

  1. We retry pushing the snapshot out until we get an ACK
  2. We capture the NACK, log, and then sit on the stream and wait until Envoy requests new resources

One possible option is to have Incremental xDS streams follow the same pattern when a snapshot is rejected (continuous rejection loop) but with exponential backoff. Currently the snapshot will immediately send after the rejection but this could cause CPU overload when left unwatched. See issue/46. Backoff logic would be something similar to: jpillora/backoff.

The second option follows more closely with how a basic Incremental stream request/response flow should behave. Alternatively if a NACK is received, the management server may keep track of that, and when a new update is found within the server, it can send a new DiscoveryResponse with a new nonce in an effort to jump over the failed config application.

During reconnect, Envoy can send its previously known resources to avoid resending. No state is preserved from previous streams. Connecting clients will provide the management server with all resource names/aliases it is interested in.

@lita
Copy link

lita commented Jan 3, 2020

Can we add a section here about failure scenarios? Such as what should happen when an update is missed and if there is a conflict?

@alecholmez
Copy link
Author

Sure thing, I'm brainstorming those right now that's an important section. If you have an ideas as well would love to hear them!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment