Created July 18, 2023 11:14
stream logs
func streamLogs(clientset kubernetes.Interface, namespace, labelSelector string) error {
ctx := context.TODO()
// Watch for changes to pods that match the label selector
listWatcher := cache.NewListWatchFromClient(
// Create a pod informer to watch for changes to pods
podInformer := cache.NewSharedInformer(listWatcher, &corev1.Pod{}, 0)
AddFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
// Start streaming logs from the newly created pod
startLogStreaming(clientset, namespace, pod)
DeleteFunc: func(obj interface{}) {
// Handle pod deletion if needed
UpdateFunc: func(oldObj, newObj interface{}) {
// Handle pod updates if needed
// Start the pod informer to receive events about pods
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
// Wait for the informer's cache to sync before streaming logs
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
// Wait forever to keep the log streaming running
select {}
func startLogStreaming(clientset kubernetes.Interface, namespace string, pod *corev1.Pod) {
// Only handle pods that are running
if pod.Status.Phase != corev1.PodRunning {
podLogOpts := &corev1.PodLogOptions{
Container: pod.Spec.Containers[0].Name, // Assume there's only one container in the pod
Follow: true, // Set to true to stream logs continuously
req := clientset.CoreV1().Pods(namespace).GetLogs(pod.Name, podLogOpts)
readCloser, err := req.Stream(context.Background())
if err != nil {
fmt.Printf("failed to open log stream for pod %s: %v\n", pod.Name, err)
defer readCloser.Close()
// Start streaming logs from the pod
// This implementation simply writes the logs to stdout, but you can customize it as per your requirements.
go func() {
_, _ = io.Copy(os.Stdout, readCloser)
