Skip to content

Instantly share code, notes, and snippets.

@njhale
Last active June 3, 2019 18:28
Show Gist options
  • Save njhale/5e20a68102cd1e4b0f142a165ae19113 to your computer and use it in GitHub Desktop.
Save njhale/5e20a68102cd1e4b0f142a165ae19113 to your computer and use it in GitHub Desktop.
Modeling Kubernetes Controllers as Observer Graphs
package subscription
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators"
)
// Event describes any event traversing the event graph.
type Event interface{}
// Observer describes something that can be notified of an event.
// It can be thought of as a sink vertex.
type Observer interface {
Subscribe(ctx context.Context, observables ...Observable) error
Notify(ctx context.Context, event Event) error
}
// Act is an action that can be taken on an observer.
type Act func(observer Observer) error
// ObserverList describes a list of Observers.
type ObserverList interface {
Add(ctx context.Context, observers ...Observer) error
ForEach(do Act) error
}
// Observable describes an ObserverList that can notify each of its observers of an event.
// It can be used to represent/inject events from external sources.
// It can be thought of as a source vertex.
type Observable interface {
ObserverList
NotifyObservers(ctx context.Context, event Event) error
// How would the concept of a finite Observable fit?
// Would we add a Completed method to signify a complete Observable?
}
// Subject describes an Observer that is Observable.
// This can be used to apply transforms and forward events between observables, observers, and subjects.
// It can be thought of as an internal vertex.
type Subject interface {
Observer
Observable
}
type SimpleObserverList []Observer
func (s SimpleObserverList) Add(ctx context.Context, observers ...Observer) error {
if len(observers) == 0 {
return errors.Error("at least one observer must be provided to add")
}
for _, obe := range s {
if obe == nil {
return errors.Error("cannot add nil observer")
}
}
s = append(s, observers...)
return nil
}
func (s SimpleObserverList) ForEach(do Act) error {
for i := 0; i < len(s); i++ {
if err := do(s[i]); err != nil {
return err
}
}
return nil
}
type ThreadSafeObserverList struct {
ObserverList
mu sync.RWMutex
}
func (t *ThreadSafeObserverList) Add(ctx context.Context, observers ...Observer) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.ObserverList.Add(ctx, observers...)
}
func (t *ThreadSafeObserverList) ForEach(do Act) error {
t.mu.RLock()
defer t.mu.RUnlock()
return t.ObserverList.ForEach(do)
}
type SimpleObservable struct {
ObserverList
}
func (t *SimpleObservable) NotifyObservers(ctx context.Context, event Event) error {
return t.ForEach(func(observer Observer) error {
return observer.Notify(ctx, event)
})
}
// InformerObservable allows an informer to act as an event source.
// It notifies observers of its informer events.
type InformerObservable struct {
Observable
informer cache.SharedIndexInformer
logger logrus.Logger
}
// Add registers the given set of observers and rigs the observable's informer events to notify them.
func (i *InformerObservable) Add(ctx context.Context, observers ...Observer) error {
// Add the observers first to handle any invariants
if err := i.Observable.Add(ctx, observers...); err != nil {
return err
}
// Add an EventHandlerFunc to notify each new observer
return SimpleObserverList(observers).ForEach(func(observer Observer) error {
i.informer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if err := observer.Notify(ctx, Event(obj)); err != nil {
logrus.WithError(err).Warn("error notifying informer observer of add event")
}
},
DeleteFunc: func(obj interface{}) {
if err := observer.Notify(ctx, Event(obj)); err != nil {
logrus.WithError(err).Warn("error notifying informer observer of delete event")
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if err := observer.Notify(ctx, Event(newObj)); err != nil {
logrus.WithError(err).Warn("error notifying informer observer of update event")
}
},
})
return nil
})
}
// SimpleSubject implements the minimum requirements of a Subject.
// It is useful as a basis in composing more complex subjects.
type SimpleSubject struct {
Observable
}
// Subscribe adds the subject to each of the given observables.
func (s *SimpleSubject) Subscribe(ctx context.Context, observables ...Observable) error {
if len(observables) == 0 {
return errors.New("at least one observable must be provided to subscribe")
}
for _, oba := range observables {
if oba == nil {
return errors.New("cannot subscribe to nil observable")
}
if err := oba.Add(ctx, s); err != nil {
return err
}
}
return nil
}
// Notify forwards the given event to the subject's observers without modification.
func (s *SimpleSubject) Notify(ctx context.Context, event Event) error {
return s.NotifyObservers(ctx, event)
}
// ConcurrentSubject forwards notifications to its observers concurrently.
type ConcurrentSubject struct {
Subject
sem *semaphore.Weighted
timeout time.Duration
}
func (c *ConcurrentSubject) NotifyObservers(ctx context.Context, event Event) error {
deadline, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
g, gctx := errgroup.WithContext(deadline)
c.ForEach(func(observer Observer) error {
c.sem.Acquire(gctx, 1)
g.Go(func() error {
defer c.sem.Release(1)
return observer.Notify(gctx, event)
})
return nil
})
return g.Wait()
}
// QueueSubject uses a workqueue to act as a notification queue to its observers.
type QueueSubject struct {
Subject
queue workqueue.RateLimitingInterface
once sync.Once
maxRequeues int
workers int
sem *semaphore.Weighted
timeout time.Duration
logger logrus.Logger
}
// Notify adds the given event to the subject's queue and calls NotifyObservers
// to start processing the queue.
func (q *QueueSubject) Notify(ctx context.Context, event Event) error {
q.queue.Add(event)
return q.NotifyObservers(ctx, event)
}
// NotifyObservers starts queue processing the first time it's called.
// Subsequent calls to this method are idempotent.
func (q *QueueSubject) NotifyObservers(ctx context.Context, event Event) error {
// Only start once
q.once.Do(func() {
// TODO: Move worker setup to constructor
go func() {
// Process queue items
if err := q.process(ctx); err != nil {
q.logger.WithError(err).Panic("error processing queue")
}
}()
})
return nil
}
func (q *QueueSubject) process(ctx context.Context) error {
// Schedule a deadline context for workers to start
deadline, cancel := context.WithTimeout(ctx, q.timeout)
defer cancel()
// Create an error group to wait for workers to complete
g, gctx := errgroup.WithContext(ctx)
for w := 0; w < q.workers; w++ {
if err := q.sem.Acquire(deadline, 1); err != nil {
return err
}
g.Go(func() error {
defer q.sem.Release(1)
for q.work(gctx) {
}
return nil
})
}
return g.Wait()
}
func (q *QueueSubject) work(ctx context.Context) bool {
queue := q.queue
item, quit := queue.Get()
if quit {
return false
}
defer queue.Done(item)
// Schedule a work deadline
deadline, cancel := context.WithTimeout(ctx, q.timeout)
defer cancel()
logger := q.logger.WithField("item", item)
logger.Debug("notifying observers")
// Requeue on error
if requeues, err := queue.NumRequeues(item), q.Subject.Notify(deadline, item); err != nil {
logger = logger.WithError(err).WithFields(logrus.Fields{
"requeues": requeues,
"limit": q.maxRequeues,
})
if requeues < q.maxRequeues {
logger.Debug("requeuing item")
queue.AddRateLimited(item)
return true
}
logger.Debug("requeue limit reached, forgetting item")
}
queue.Forget(item)
return true
}
// CacheSubject searches its cache for items matching notification events and
// notifies its observers with events containing those items if found.
type CacheSubject struct {
Subject
indexer cache.Indexer
keyFunc cache.KeyFunc
logger logrus.Logger
}
// Notify notifies the subject of a potential change in its cache.
// This method attempts to retrieve a stored item matching the key derived from the given
// event and registered KeyFunc. If a matching item is found, the subject's observers are
// notified with a new event containing it.
func (c *CacheSubject) Notify(ctx context.Context, event Event) error {
key, err := c.keyFunc(event)
if err != nil {
// Ignore key func errors
c.logger.WithError(err).WithField("event", event).Debug("failed to build key")
return nil
}
logger := c.logger.WithField("key", key)
item, exists, err := c.indexer.GetByKey(key)
if err != nil {
return err
}
if !exists {
logger.Debug("not found in cache")
return nil
}
out := Event(item)
return c.Subject.Notify(ctx, out)
}
// HandleSubscription provides a strongly-typed interface for handling changes
// to unversioned Subscriptions.
type HandleSubscription func(*operators.Subscription) error
// SubscriptionObserver is a consumer of Subscription events, invoking its handler when notified.
// It ignores all events not of dynamic type Subscription.
type SubscriptionObserver struct {
Observer
handle HandleSubscription
logger logrus.Logger
}
// Notify notifies the observer of a Subscription event, which invokes its handler.
// This method ignores all events not of dynamic type Subscription.
func (s *SubscriptionObserver) Notify(ctx context.Context, event Event) error {
sub, ok := event.(*operators.Subscription)
if !ok {
// Skip since the subject and its observers don't care about events of this type
s.logger.WithField("event", event).Debugf("event not of type %T, skipping", new(*v1alpha1.Subscription))
return nil
}
return s.handle(sub)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment