Skip to content

Instantly share code, notes, and snippets.

@jpeach
Created December 2, 2021 01:10
Show Gist options
  • Save jpeach/128b5199a564928d396545f92f7fc3a8 to your computer and use it in GitHub Desktop.
Save jpeach/128b5199a564928d396545f92f7fc3a8 to your computer and use it in GitHub Desktop.
diff --git pkg/server/sotw/v3/server.go pkg/server/sotw/v3/server.go
index b217b6cf..19d90612 100644
--- pkg/server/sotw/v3/server.go
+++ pkg/server/sotw/v3/server.go
@@ -124,14 +124,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
return out.Nonce, str.Send(out)
}
- open := func(w *watch, req *discovery.DiscoveryRequest, responder chan cache.Response) {
- w.cancel = s.cache.CreateWatch(req, streamState, responder)
- watches.cases[w.index] = reflect.SelectCase{
- Dir: reflect.SelectRecv,
- Chan: reflect.ValueOf(responder),
- }
- }
-
if s.callbacks != nil {
if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
@@ -206,25 +198,31 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
if w.nonce == "" || w.nonce == nonce {
w.Cancel()
- open(w, req, responder)
+ watches.addWatch(typeURL, &watch{
+ cancel: s.cache.CreateWatch(req, streamState, responder),
+ nonce: w.nonce,
+ responseChan: responder,
+ })
}
} else {
// No pre-existing watch exists, let's create one.
- // We need to precompute the watches first then open a watch in the cache.
- watches.responders[typeURL] = &watch{}
- w = watches.responders[typeURL]
- watches.RecomputeWatches(s.ctx, reqCh)
-
- open(w, req, responder)
+ watches.addWatch(typeURL, &watch{
+ cancel: s.cache.CreateWatch(req, streamState, responder),
+ responseChan: responder,
+ })
}
+
+ // Recompute the dynamic select cases.
+ watches.RecomputeWatches(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
- return status.Errorf(codes.Unavailable, "resource watch failed")
+ // Receiver channel was closed. TODO(jpeach) probably cancel the watch or something?
+ return status.Errorf(codes.Unavailable, "resource watch %d failed", index)
}
res := value.Interface().(cache.Response)
- nonce, err := send(value.Interface().(cache.Response))
+ nonce, err := send(res)
if err != nil {
return err
}
diff --git pkg/server/sotw/v3/watches.go pkg/server/sotw/v3/watches.go
index 2c40022e..4d7c7fc8 100644
--- pkg/server/sotw/v3/watches.go
+++ pkg/server/sotw/v3/watches.go
@@ -6,6 +6,7 @@ import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
+ "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)
// watches for all xDS resource types
@@ -31,38 +32,40 @@ func (w *watches) Cancel() {
}
}
+func (w *watches) addWatch(typeURL string, watch *watch) {
+ w.responders[typeURL] = watch
+}
+
// recomputeWatches rebuilds the known list of dynamic channels if needed
func (w *watches) RecomputeWatches(ctx context.Context, req <-chan *discovery.DiscoveryRequest) {
- cases := []reflect.SelectCase{
- {
+ w.cases = w.cases[:0] // Clear while retaining capacity.
+
+ w.cases = append(w.cases,
+ reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
},
- {
+ reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(req),
},
- }
+ )
- index := len(cases)
- for _, watch := range w.responders {
- cases = append(cases, w.cases[watch.index])
- watch.index = index
- index++
+ for _, r := range w.responders {
+ w.cases = append(w.cases,
+ reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(r.responseChan),
+ },
+ )
}
-
- w.cases = cases
}
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
- cancel func()
- nonce string
-
- // Index is used to track the location of this channel in watches. This allows us
- // to update the channel used at this slot without recomputing the entire list of select
- // statements.
- index int
+ cancel func()
+ nonce string
+ responseChan chan cache.Response
}
// Cancel calls terminate and cancel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment