Skip to content

Instantly share code, notes, and snippets.

@xigang
Created March 19, 2023 13:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xigang/3c8f222df2d5a9e881d6b35be34aa5b7 to your computer and use it in GitHub Desktop.
Save xigang/3c8f222df2d5a9e881d6b35be34aa5b7 to your computer and use it in GitHub Desktop.
pod informer cache test case
package main
import (
"encoding/json"
"flag"
"fmt"
"os"
"path/filepath"
"time"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
klog "k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/component-base/logs"
)
// PodLoggingController logs the name and namespace of pods that are added,
// deleted, or updated
type PodLoggingController struct {
informerFactory informers.SharedInformerFactory
podInformer coreinformers.PodInformer
}
// Run starts shared informers and waits for the shared informer cache to
// synchronize.
func (c *PodLoggingController) Run(stopCh chan struct{}) error {
// Starts all the shared informers that have been created by the factory so
// far.
c.informerFactory.Start(stopCh)
// wait for the initial synchronization of the local cache.
if !cache.WaitForCacheSync(stopCh, c.podInformer.Informer().HasSynced) {
return fmt.Errorf("failed to sync")
}
return nil
}
func (c *PodLoggingController) podAdd(obj interface{}) {
pod := obj.(*v1.Pod)
cachePod, err := c.podInformer.Lister().Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.Infof("Failed to get pods from cache: %v", err)
return
}
data, _ := json.MarshalIndent(&cachePod, " ", " ")
fmt.Printf("cache pod: %+v\n", string(data))
}
func (c *PodLoggingController) podUpdate(old, new interface{}) {
}
func (c *PodLoggingController) podDelete(obj interface{}) {
}
// NewPodLoggingController creates a PodLoggingController
func NewPodLoggingController(informerFactory informers.SharedInformerFactory) (*PodLoggingController, error) {
podInformer := informerFactory.Core().V1().Pods()
c := &PodLoggingController{
informerFactory: informerFactory,
podInformer: podInformer,
}
_, err := podInformer.Informer().AddEventHandler(
// Your custom resource event handlers.
cache.ResourceEventHandlerFuncs{
// Called on creation
AddFunc: c.podAdd,
// Called on resource update and every resyncPeriod on existing resources.
UpdateFunc: c.podUpdate,
// Called on resource deletion.
DeleteFunc: c.podDelete,
},
)
if err != nil {
return nil, err
}
return c, nil
}
var kubeconfig string
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(os.Getenv("HOME"), ".kube", "config"), "absolute path to the kubeconfig file")
}
func main() {
flag.Parse()
logs.InitLogs()
defer logs.FlushLogs()
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)
controller, err := NewPodLoggingController(factory)
if err != nil {
klog.Fatal(err)
}
stop := make(chan struct{})
defer close(stop)
err = controller.Run(stop)
if err != nil {
klog.Fatal(err)
}
select {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment