Kubernetes controller example - Related post at: https://medium.com/@fntlnz/what-i-learnt-about-kubernetes-controllers-db7591531973
[[constraint]] | |
name = "k8s.io/api" | |
version = "kubernetes-1.11.0" | |
[[constraint]] | |
name = "k8s.io/apimachinery" | |
version = "kubernetes-1.11.0" | |
[[constraint]] | |
name = "k8s.io/client-go" | |
version = "kubernetes-1.11.0" | |
[prune] | |
go-tests = true | |
unused-packages = true | |
[[override]] | |
name = "github.com/json-iterator/go" | |
version = "1.1.5" |
package main | |
import ( | |
"fmt" | |
"time" | |
"go.uber.org/zap" | |
"k8s.io/api/core/v1" | |
"k8s.io/apimachinery/pkg/fields" | |
"k8s.io/apimachinery/pkg/util/runtime" | |
"k8s.io/apimachinery/pkg/util/wait" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/tools/cache" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/client-go/util/workqueue" | |
) | |
type Controller struct { | |
informer cache.SharedInformer | |
queue workqueue.RateLimitingInterface | |
logger *zap.Logger | |
} | |
func NewController(queue workqueue.RateLimitingInterface, informer cache.SharedInformer) *Controller { | |
return &Controller{ | |
informer: informer, | |
queue: queue, | |
logger: zap.NewNop(), | |
} | |
} | |
func (c *Controller) WithLogger(logger *zap.Logger) { | |
c.logger = logger | |
} | |
func (c *Controller) processNextItem() bool { | |
key, quit := c.queue.Get() | |
if quit { | |
return false | |
} | |
defer c.queue.Done(key) | |
err := c.syncToStdout(key.(string)) | |
c.handleErr(err, key) | |
return true | |
} | |
func (c *Controller) syncToStdout(key string) error { | |
obj, exists, err := c.informer.GetStore().GetByKey(key) | |
if err != nil { | |
c.logger.Error("error fetching object from index for the specified key", zap.String("key", key), zap.Error(err)) | |
return err | |
} | |
if !exists { | |
c.logger.Info("pod has gone", zap.String("key", key)) | |
// do your heavy stuff for when the pod is gone here | |
} else { | |
c.logger.Info("update received for pod", zap.String("key", key), zap.String("pod", obj.(*v1.Pod).GetName())) | |
// do your heavy stuff for when the pod is created/updated here | |
} | |
return nil | |
} | |
func (c *Controller) handleErr(err error, key interface{}) { | |
if err == nil { | |
c.queue.Forget(key) | |
return | |
} | |
if c.queue.NumRequeues(key) < 5 { | |
c.logger.Info("error during sync", zap.String("key", key.(string)), zap.Error(err)) | |
c.queue.AddRateLimited(key) | |
return | |
} | |
c.queue.Forget(key) | |
runtime.HandleError(err) | |
c.logger.Info("drop pod out of queue after many retries", zap.String("key", key.(string)), zap.Error(err)) | |
} | |
func (c *Controller) Run(threadiness int, stopCh chan struct{}) { | |
defer runtime.HandleCrash() | |
defer c.queue.ShutDown() | |
c.logger.Info("starting controller") | |
go c.informer.Run(stopCh) | |
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { | |
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) | |
return | |
} | |
for i := 0; i < threadiness; i++ { | |
go wait.Until(c.runWorker, time.Second, stopCh) | |
} | |
<-stopCh | |
c.logger.Info("stopping controller") | |
} | |
func (c *Controller) runWorker() { | |
for c.processNextItem() { | |
} | |
} | |
func main() { | |
logger, err := zap.NewProduction() | |
if err != nil { | |
panic(err) | |
} | |
rules := clientcmd.NewDefaultClientConfigLoadingRules() | |
overrides := &clientcmd.ConfigOverrides{} | |
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() | |
if err != nil { | |
logger.Fatal("error getting kubernetes client config", zap.Error(err)) | |
} | |
clientset, err := kubernetes.NewForConfig(config) | |
if err != nil { | |
logger.Fatal("error creating kubernetes client", zap.Error(err)) | |
} | |
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) | |
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) | |
si := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, 0) | |
si.AddEventHandler(cache.ResourceEventHandlerFuncs{ | |
AddFunc: func(obj interface{}) { | |
key, err := cache.MetaNamespaceKeyFunc(obj) | |
if err == nil { | |
queue.Add(key) | |
} | |
}, | |
UpdateFunc: func(old interface{}, new interface{}) { | |
key, err := cache.MetaNamespaceKeyFunc(new) | |
if err == nil { | |
queue.Add(key) | |
} | |
}, | |
DeleteFunc: func(obj interface{}) { | |
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) | |
if err == nil { | |
queue.Add(key) | |
} | |
}, | |
}) | |
controller := NewController(queue, si) | |
controller.WithLogger(logger) | |
stop := make(chan struct{}) | |
defer close(stop) | |
go controller.Run(1, stop) | |
select {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment