Skip to content

Instantly share code, notes, and snippets.

@fntlnz
Last active October 14, 2020 18:32
Show Gist options
  • Save fntlnz/65ec4e7273b15e858dda97c0f6f2241b to your computer and use it in GitHub Desktop.
Save fntlnz/65ec4e7273b15e858dda97c0f6f2241b to your computer and use it in GitHub Desktop.
[[constraint]]
name = "k8s.io/api"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/client-go"
version = "kubernetes-1.11.0"
[prune]
go-tests = true
unused-packages = true
[[override]]
name = "github.com/json-iterator/go"
version = "1.1.5"
package main
import (
"fmt"
"time"
"go.uber.org/zap"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
informer cache.SharedInformer
queue workqueue.RateLimitingInterface
logger *zap.Logger
}
func NewController(queue workqueue.RateLimitingInterface, informer cache.SharedInformer) *Controller {
return &Controller{
informer: informer,
queue: queue,
logger: zap.NewNop(),
}
}
func (c *Controller) WithLogger(logger *zap.Logger) {
c.logger = logger
}
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncToStdout(key.(string))
c.handleErr(err, key)
return true
}
func (c *Controller) syncToStdout(key string) error {
obj, exists, err := c.informer.GetStore().GetByKey(key)
if err != nil {
c.logger.Error("error fetching object from index for the specified key", zap.String("key", key), zap.Error(err))
return err
}
if !exists {
c.logger.Info("pod has gone", zap.String("key", key))
// do your heavy stuff for when the pod is gone here
} else {
c.logger.Info("update received for pod", zap.String("key", key), zap.String("pod", obj.(*v1.Pod).GetName()))
// do your heavy stuff for when the pod is created/updated here
}
return nil
}
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < 5 {
c.logger.Info("error during sync", zap.String("key", key.(string)), zap.Error(err))
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
runtime.HandleError(err)
c.logger.Info("drop pod out of queue after many retries", zap.String("key", key.(string)), zap.Error(err))
}
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
c.logger.Info("starting controller")
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
c.logger.Info("stopping controller")
}
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
func main() {
logger, err := zap.NewProduction()
if err != nil {
panic(err)
}
rules := clientcmd.NewDefaultClientConfigLoadingRules()
overrides := &clientcmd.ConfigOverrides{}
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
if err != nil {
logger.Fatal("error getting kubernetes client config", zap.Error(err))
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Fatal("error creating kubernetes client", zap.Error(err))
}
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
si := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, 0)
si.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
controller := NewController(queue, si)
controller.WithLogger(logger)
stop := make(chan struct{})
defer close(stop)
go controller.Run(1, stop)
select {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment