Skip to content

Instantly share code, notes, and snippets.

@zeusro
Created May 20, 2021 04:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zeusro/af1bd74287aededd0f90a20c68c342cf to your computer and use it in GitHub Desktop.
Save zeusro/af1bd74287aededd0f90a20c68c342cf to your computer and use it in GitHub Desktop.
pod-informer.go
package main
import (
"context"
"flag"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"log"
"path/filepath"
"time"
)
func main() {
fmt.Println("监听特定label pod的更新,其他资源同理")
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// 初始化 client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err.Error())
}
stopper := make(chan struct{})
defer close(stopper)
labelSelector := labels.Set(map[string]string{"controller": "titan-operator"}).AsSelector()
namespace := "titan-system"
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
options.LabelSelector = labelSelector.String()
return clientset.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector.String()
return clientset.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
time.Second *2, //2秒同步一次
cache.Indexers{},
)
defer runtime.HandleCrash()
// 启动 informer,list & watch
go informer.Run(stopper)
// 从 apiserver 同步资源,即 list
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 使用自定义 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: onUpdate,
})
fmt.Println("end1")
//<-stopper
fmt.Println("end2")
}
func onUpdate(oldObj, newObj interface{}) {
//Pending
//Running
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
if corev1.PodPending !=oldPod.Status.Phase {
return
}
if corev1.PodRunning != newPod.Status.Phase{
return
}
fmt.Println("need to update cr status", newPod.Status.Phase)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment