Skip to content

Instantly share code, notes, and snippets.

@slok
Created June 4, 2018 15:36
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save slok/65f5a9b395830fc6c9541573ede2c695 to your computer and use it in GitHub Desktop.
Save slok/65f5a9b395830fc6c9541573ede2c695 to your computer and use it in GitHub Desktop.
Kubernetes controller that updates annotation on pods with `kooper: test` label
package main
import (
"os"
"path/filepath"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"github.com/spotahome/kooper/log"
"github.com/spotahome/kooper/operator/controller"
"github.com/spotahome/kooper/operator/handler"
"github.com/spotahome/kooper/operator/retrieve"
)
func main() {
// Initialize logger.
log := &log.Std{}
// Get k8s client.
k8scfg, err := rest.InClusterConfig()
if err != nil {
// No in cluster? letr's try locally
kubehome := filepath.Join(homedir.HomeDir(), ".kube", "config")
k8scfg, err = clientcmd.BuildConfigFromFlags("", kubehome)
if err != nil {
log.Errorf("error loading kubernetes configuration: %s", err)
os.Exit(1)
}
}
k8scli, err := kubernetes.NewForConfig(k8scfg)
if err != nil {
log.Errorf("error creating kubernetes client: %s", err)
os.Exit(1)
}
// Create our retriever so the controller knows how to get/listen for pod events.
retr := &retrieve.Resource{
Object: &corev1.Pod{},
ListerWatcher: &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// Just apply logic on this type of pods.
options.LabelSelector = "test=kooper"
return k8scli.CoreV1().Pods("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Just apply logic on this type of pods.
options.LabelSelector = "test=kooper"
return k8scli.CoreV1().Pods("").Watch(options)
},
},
}
// Our domain logic that will update annotations every add/sync/update of filtered pods.
hand := &handler.HandlerFunc{
AddFunc: func(obj runtime.Object) error {
pod := obj.(*corev1.Pod)
log.Infof("Pod being handled: %s", pod.ObjectMeta.Name)
// Copy pod and update the annotation.
newPod := pod.DeepCopy()
ann := newPod.ObjectMeta.Annotations
if ann == nil {
ann = make(map[string]string)
}
ann["kooper"] = "handled"
newPod.ObjectMeta.Annotations = ann
_, err := k8scli.CoreV1().Pods(newPod.ObjectMeta.Namespace).Update(newPod)
if err != nil {
return err
}
log.Infof("Pod annotation updated: %s", pod.ObjectMeta.Name)
return nil
},
DeleteFunc: func(s string) error {
log.Infof("Pod deleted: %s", s)
return nil
},
}
// Create the controller that will refresh every 30 seconds.
ctrl := controller.NewSequential(30*time.Second, hand, retr, nil, log)
// Start our controller.
stopC := make(chan struct{})
if err := ctrl.Run(stopC); err != nil {
log.Errorf("error running controller: %s", err)
os.Exit(1)
}
os.Exit(0)
}
@slok
Copy link
Author

slok commented Jun 4, 2018

Test

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: nginx-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: nginx
        test: kooper
    spec:
      containers:
      - name: nginx
        image: nginx
        ports:
        - containerPort: 80

Listen to Resources

We listen to corev1.Pod resource kinds (in all namespaces) and only the ones that have test: kooper label:

ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    // Just apply logic on this type of pods.
    options.LabelSelector = "test=kooper"
    return k8scli.CoreV1().Pods("").List(options)
},

https://gist.github.com/slok/65f5a9b395830fc6c9541573ede2c695#file-main-go-L50-LL58

Update pod annotation

We make a deep copy of the object, ensure the annotations map exists (if not we create), add our annotation kooper: handled and update with the kubernetes client.

newPod := pod.DeepCopy()
// ... update annotations in object
_, err := k8scli.CoreV1().Pods(newPod.ObjectMeta.Namespace).Update(newPod)

https://gist.github.com/slok/65f5a9b395830fc6c9541573ede2c695#file-main-go-L69-L81

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment