Last active
June 3, 2019 18:28
-
-
Save njhale/5e20a68102cd1e4b0f142a165ae19113 to your computer and use it in GitHub Desktop.
Modeling Kubernetes Controllers as Observer Graphs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package subscription | |
import ( | |
"context" | |
"sync" | |
"time" | |
"github.com/sirupsen/logrus" | |
"github.com/pkg/errors" | |
"golang.org/x/sync/errgroup" | |
"golang.org/x/sync/semaphore" | |
"k8s.io/client-go/tools/cache" | |
"k8s.io/client-go/util/workqueue" | |
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators" | |
) | |
// Event describes any event traversing the event graph. | |
type Event interface{} | |
// Observer describes something that can be notified of an event. | |
// It can be thought of as a sink vertex. | |
type Observer interface { | |
Subscribe(ctx context.Context, observables ...Observable) error | |
Notify(ctx context.Context, event Event) error | |
} | |
// Act is an action that can be taken on an observer. | |
type Act func(observer Observer) error | |
// ObserverList describes a list of Observers. | |
type ObserverList interface { | |
Add(ctx context.Context, observers ...Observer) error | |
ForEach(do Act) error | |
} | |
// Observable describes an ObserverList that can notify each of its observers of an event. | |
// It can be used to represent/inject events from external sources. | |
// It can be thought of as a source vertex. | |
type Observable interface { | |
ObserverList | |
NotifyObservers(ctx context.Context, event Event) error | |
// How would the concept of a finite Observable fit? | |
// Would we add a Completed method to signify a complete Observable? | |
} | |
// Subject describes an Observer that is Observable. | |
// This can be used to apply transforms and forward events between observables, observers, and subjects. | |
// It can be thought of as an internal vertex. | |
type Subject interface { | |
Observer | |
Observable | |
} | |
type SimpleObserverList []Observer | |
func (s SimpleObserverList) Add(ctx context.Context, observers ...Observer) error { | |
if len(observers) == 0 { | |
return errors.Error("at least one observer must be provided to add") | |
} | |
for _, obe := range s { | |
if obe == nil { | |
return errors.Error("cannot add nil observer") | |
} | |
} | |
s = append(s, observers...) | |
return nil | |
} | |
func (s SimpleObserverList) ForEach(do Act) error { | |
for i := 0; i < len(s); i++ { | |
if err := do(s[i]); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
type ThreadSafeObserverList struct { | |
ObserverList | |
mu sync.RWMutex | |
} | |
func (t *ThreadSafeObserverList) Add(ctx context.Context, observers ...Observer) error { | |
t.mu.Lock() | |
defer t.mu.Unlock() | |
return t.ObserverList.Add(ctx, observers...) | |
} | |
func (t *ThreadSafeObserverList) ForEach(do Act) error { | |
t.mu.RLock() | |
defer t.mu.RUnlock() | |
return t.ObserverList.ForEach(do) | |
} | |
type SimpleObservable struct { | |
ObserverList | |
} | |
func (t *SimpleObservable) NotifyObservers(ctx context.Context, event Event) error { | |
return t.ForEach(func(observer Observer) error { | |
return observer.Notify(ctx, event) | |
}) | |
} | |
// InformerObservable allows an informer to act as an event source. | |
// It notifies observers of its informer events. | |
type InformerObservable struct { | |
Observable | |
informer cache.SharedIndexInformer | |
logger logrus.Logger | |
} | |
// Add registers the given set of observers and rigs the observable's informer events to notify them. | |
func (i *InformerObservable) Add(ctx context.Context, observers ...Observer) error { | |
// Add the observers first to handle any invariants | |
if err := i.Observable.Add(ctx, observers...); err != nil { | |
return err | |
} | |
// Add an EventHandlerFunc to notify each new observer | |
return SimpleObserverList(observers).ForEach(func(observer Observer) error { | |
i.informer.AddEventHandler(&cache.ResourceEventHandlerFuncs{ | |
AddFunc: func(obj interface{}) { | |
if err := observer.Notify(ctx, Event(obj)); err != nil { | |
logrus.WithError(err).Warn("error notifying informer observer of add event") | |
} | |
}, | |
DeleteFunc: func(obj interface{}) { | |
if err := observer.Notify(ctx, Event(obj)); err != nil { | |
logrus.WithError(err).Warn("error notifying informer observer of delete event") | |
} | |
}, | |
UpdateFunc: func(oldObj, newObj interface{}) { | |
if err := observer.Notify(ctx, Event(newObj)); err != nil { | |
logrus.WithError(err).Warn("error notifying informer observer of update event") | |
} | |
}, | |
}) | |
return nil | |
}) | |
} | |
// SimpleSubject implements the minimum requirements of a Subject. | |
// It is useful as a basis in composing more complex subjects. | |
type SimpleSubject struct { | |
Observable | |
} | |
// Subscribe adds the subject to each of the given observables. | |
func (s *SimpleSubject) Subscribe(ctx context.Context, observables ...Observable) error { | |
if len(observables) == 0 { | |
return errors.New("at least one observable must be provided to subscribe") | |
} | |
for _, oba := range observables { | |
if oba == nil { | |
return errors.New("cannot subscribe to nil observable") | |
} | |
if err := oba.Add(ctx, s); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
// Notify forwards the given event to the subject's observers without modification. | |
func (s *SimpleSubject) Notify(ctx context.Context, event Event) error { | |
return s.NotifyObservers(ctx, event) | |
} | |
// ConcurrentSubject forwards notifications to its observers concurrently. | |
type ConcurrentSubject struct { | |
Subject | |
sem *semaphore.Weighted | |
timeout time.Duration | |
} | |
func (c *ConcurrentSubject) NotifyObservers(ctx context.Context, event Event) error { | |
deadline, cancel := context.WithTimeout(ctx, c.timeout) | |
defer cancel() | |
g, gctx := errgroup.WithContext(deadline) | |
c.ForEach(func(observer Observer) error { | |
c.sem.Acquire(gctx, 1) | |
g.Go(func() error { | |
defer c.sem.Release(1) | |
return observer.Notify(gctx, event) | |
}) | |
return nil | |
}) | |
return g.Wait() | |
} | |
// QueueSubject uses a workqueue to act as a notification queue to its observers. | |
type QueueSubject struct { | |
Subject | |
queue workqueue.RateLimitingInterface | |
once sync.Once | |
maxRequeues int | |
workers int | |
sem *semaphore.Weighted | |
timeout time.Duration | |
logger logrus.Logger | |
} | |
// Notify adds the given event to the subject's queue and calls NotifyObservers | |
// to start processing the queue. | |
func (q *QueueSubject) Notify(ctx context.Context, event Event) error { | |
q.queue.Add(event) | |
return q.NotifyObservers(ctx, event) | |
} | |
// NotifyObservers starts queue processing the first time it's called. | |
// Subsequent calls to this method are idempotent. | |
func (q *QueueSubject) NotifyObservers(ctx context.Context, event Event) error { | |
// Only start once | |
q.once.Do(func() { | |
// TODO: Move worker setup to constructor | |
go func() { | |
// Process queue items | |
if err := q.process(ctx); err != nil { | |
q.logger.WithError(err).Panic("error processing queue") | |
} | |
}() | |
}) | |
return nil | |
} | |
func (q *QueueSubject) process(ctx context.Context) error { | |
// Schedule a deadline context for workers to start | |
deadline, cancel := context.WithTimeout(ctx, q.timeout) | |
defer cancel() | |
// Create an error group to wait for workers to complete | |
g, gctx := errgroup.WithContext(ctx) | |
for w := 0; w < q.workers; w++ { | |
if err := q.sem.Acquire(deadline, 1); err != nil { | |
return err | |
} | |
g.Go(func() error { | |
defer q.sem.Release(1) | |
for q.work(gctx) { | |
} | |
return nil | |
}) | |
} | |
return g.Wait() | |
} | |
func (q *QueueSubject) work(ctx context.Context) bool { | |
queue := q.queue | |
item, quit := queue.Get() | |
if quit { | |
return false | |
} | |
defer queue.Done(item) | |
// Schedule a work deadline | |
deadline, cancel := context.WithTimeout(ctx, q.timeout) | |
defer cancel() | |
logger := q.logger.WithField("item", item) | |
logger.Debug("notifying observers") | |
// Requeue on error | |
if requeues, err := queue.NumRequeues(item), q.Subject.Notify(deadline, item); err != nil { | |
logger = logger.WithError(err).WithFields(logrus.Fields{ | |
"requeues": requeues, | |
"limit": q.maxRequeues, | |
}) | |
if requeues < q.maxRequeues { | |
logger.Debug("requeuing item") | |
queue.AddRateLimited(item) | |
return true | |
} | |
logger.Debug("requeue limit reached, forgetting item") | |
} | |
queue.Forget(item) | |
return true | |
} | |
// CacheSubject searches its cache for items matching notification events and | |
// notifies its observers with events containing those items if found. | |
type CacheSubject struct { | |
Subject | |
indexer cache.Indexer | |
keyFunc cache.KeyFunc | |
logger logrus.Logger | |
} | |
// Notify notifies the subject of a potential change in its cache. | |
// This method attempts to retrieve a stored item matching the key derived from the given | |
// event and registered KeyFunc. If a matching item is found, the subject's observers are | |
// notified with a new event containing it. | |
func (c *CacheSubject) Notify(ctx context.Context, event Event) error { | |
key, err := c.keyFunc(event) | |
if err != nil { | |
// Ignore key func errors | |
c.logger.WithError(err).WithField("event", event).Debug("failed to build key") | |
return nil | |
} | |
logger := c.logger.WithField("key", key) | |
item, exists, err := c.indexer.GetByKey(key) | |
if err != nil { | |
return err | |
} | |
if !exists { | |
logger.Debug("not found in cache") | |
return nil | |
} | |
out := Event(item) | |
return c.Subject.Notify(ctx, out) | |
} | |
// HandleSubscription provides a strongly-typed interface for handling changes | |
// to unversioned Subscriptions. | |
type HandleSubscription func(*operators.Subscription) error | |
// SubscriptionObserver is a consumer of Subscription events, invoking its handler when notified. | |
// It ignores all events not of dynamic type Subscription. | |
type SubscriptionObserver struct { | |
Observer | |
handle HandleSubscription | |
logger logrus.Logger | |
} | |
// Notify notifies the observer of a Subscription event, which invokes its handler. | |
// This method ignores all events not of dynamic type Subscription. | |
func (s *SubscriptionObserver) Notify(ctx context.Context, event Event) error { | |
sub, ok := event.(*operators.Subscription) | |
if !ok { | |
// Skip since the subject and its observers don't care about events of this type | |
s.logger.WithField("event", event).Debugf("event not of type %T, skipping", new(*v1alpha1.Subscription)) | |
return nil | |
} | |
return s.handle(sub) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment