Created
March 19, 2023 13:34
-
-
Save xigang/3c8f222df2d5a9e881d6b35be34aa5b7 to your computer and use it in GitHub Desktop.
pod informer cache test case
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"flag" | |
"fmt" | |
"os" | |
"path/filepath" | |
"time" | |
"k8s.io/client-go/informers" | |
coreinformers "k8s.io/client-go/informers/core/v1" | |
"k8s.io/client-go/kubernetes" | |
klog "k8s.io/klog/v2" | |
v1 "k8s.io/api/core/v1" | |
"k8s.io/client-go/tools/cache" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/component-base/logs" | |
) | |
// PodLoggingController logs the name and namespace of pods that are added, | |
// deleted, or updated | |
type PodLoggingController struct { | |
informerFactory informers.SharedInformerFactory | |
podInformer coreinformers.PodInformer | |
} | |
// Run starts shared informers and waits for the shared informer cache to | |
// synchronize. | |
func (c *PodLoggingController) Run(stopCh chan struct{}) error { | |
// Starts all the shared informers that have been created by the factory so | |
// far. | |
c.informerFactory.Start(stopCh) | |
// wait for the initial synchronization of the local cache. | |
if !cache.WaitForCacheSync(stopCh, c.podInformer.Informer().HasSynced) { | |
return fmt.Errorf("failed to sync") | |
} | |
return nil | |
} | |
func (c *PodLoggingController) podAdd(obj interface{}) { | |
pod := obj.(*v1.Pod) | |
cachePod, err := c.podInformer.Lister().Pods(pod.Namespace).Get(pod.Name) | |
if err != nil { | |
klog.Infof("Failed to get pods from cache: %v", err) | |
return | |
} | |
data, _ := json.MarshalIndent(&cachePod, " ", " ") | |
fmt.Printf("cache pod: %+v\n", string(data)) | |
} | |
func (c *PodLoggingController) podUpdate(old, new interface{}) { | |
} | |
func (c *PodLoggingController) podDelete(obj interface{}) { | |
} | |
// NewPodLoggingController creates a PodLoggingController | |
func NewPodLoggingController(informerFactory informers.SharedInformerFactory) (*PodLoggingController, error) { | |
podInformer := informerFactory.Core().V1().Pods() | |
c := &PodLoggingController{ | |
informerFactory: informerFactory, | |
podInformer: podInformer, | |
} | |
_, err := podInformer.Informer().AddEventHandler( | |
// Your custom resource event handlers. | |
cache.ResourceEventHandlerFuncs{ | |
// Called on creation | |
AddFunc: c.podAdd, | |
// Called on resource update and every resyncPeriod on existing resources. | |
UpdateFunc: c.podUpdate, | |
// Called on resource deletion. | |
DeleteFunc: c.podDelete, | |
}, | |
) | |
if err != nil { | |
return nil, err | |
} | |
return c, nil | |
} | |
var kubeconfig string | |
func init() { | |
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(os.Getenv("HOME"), ".kube", "config"), "absolute path to the kubeconfig file") | |
} | |
func main() { | |
flag.Parse() | |
logs.InitLogs() | |
defer logs.FlushLogs() | |
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) | |
if err != nil { | |
panic(err.Error()) | |
} | |
clientset, err := kubernetes.NewForConfig(config) | |
if err != nil { | |
klog.Fatal(err) | |
} | |
factory := informers.NewSharedInformerFactory(clientset, time.Hour*24) | |
controller, err := NewPodLoggingController(factory) | |
if err != nil { | |
klog.Fatal(err) | |
} | |
stop := make(chan struct{}) | |
defer close(stop) | |
err = controller.Run(stop) | |
if err != nil { | |
klog.Fatal(err) | |
} | |
select {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment