-
-
Save aledbf/030f90fd8fe087048782fe8d0cfd9ce2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Copyright 2015 The Kubernetes Authors. | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
*/ | |
package main | |
import ( | |
"flag" | |
"fmt" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/google/go-cmp/cmp" | |
"github.com/google/go-cmp/cmp/cmpopts" | |
corev1 "k8s.io/api/core/v1" | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/apimachinery/pkg/fields" | |
"k8s.io/apimachinery/pkg/util/wait" | |
discovery "k8s.io/apimachinery/pkg/version" | |
kubeinformers "k8s.io/client-go/informers" | |
"k8s.io/client-go/kubernetes" | |
cache "k8s.io/client-go/tools/cache" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/ingress-nginx/internal/k8s" | |
"k8s.io/klog" | |
) | |
func main() { | |
klog.InitFlags(nil) | |
flag.Set("logtostderr", "true") | |
flag.CommandLine.Parse([]string{}) | |
kubeClient, err := createApiserverClient("http://127.0.0.1:8001", "/home/aledbf/.kube/config") | |
if err != nil { | |
handleFatalInitError(err) | |
} | |
// stop channel | |
stopCh := make(chan struct{}) | |
// event channel (temporal) | |
eventCh := make(chan string) | |
// configmap to watch | |
key := "default/test" | |
klog.Infof("Watching configmap %v", key) | |
w, err := watchConfigmap(key, eventCh, stopCh, kubeClient) | |
if err != nil { | |
handleFatalInitError(err) | |
} | |
go func(w *watcher, eventCh chan string) { | |
for { | |
select { | |
case reason := <-eventCh: | |
// for now just show a string with event and the configmap | |
klog.Infof("[K8S data change] - reason: %v", reason) | |
configmap, err := w.GetConfigMap() | |
if err != nil { | |
klog.Error(err) | |
continue | |
} | |
klog.Infof("[K8S object] %v", configmap) | |
case <-stopCh: | |
break | |
} | |
} | |
}(w, eventCh) | |
signalChan := make(chan os.Signal, 1) | |
signal.Notify(signalChan, syscall.SIGTERM) | |
<-signalChan | |
klog.Info("Received SIGTERM, shutting down") | |
time.Sleep(10 * time.Second) | |
} | |
type ConfigmapWatcher interface { | |
GetConfigMap() (*corev1.ConfigMap, error) | |
} | |
func NewConfigmapWatcher(key string, eventCh chan string, stopCh chan struct{}, kubeClient kubernetes.Interface) (ConfigmapWatcher, error) { | |
w, err := watchConfigmap(key, eventCh, stopCh, kubeClient) | |
if err != nil { | |
return nil, err | |
} | |
return w, nil | |
} | |
type watcher struct { | |
object *corev1.ConfigMap | |
} | |
func (w *watcher) GetConfigMap() (*corev1.ConfigMap, error) { | |
return w.object, nil | |
} | |
func watchConfigmap(key string, eventCh chan string, stopCh chan struct{}, kubeClient kubernetes.Interface) (*watcher, error) { | |
ns, name, err := k8s.ParseNameNS(key) | |
if err != nil { | |
return nil, err | |
} | |
kubeInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(kubeClient, 0, ns, | |
func(options *metav1.ListOptions) { | |
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String() | |
}, | |
) | |
w := &watcher{} | |
informer := kubeInformerFactory.Core().V1().ConfigMaps().Informer() | |
var remove func(obj interface{}) | |
remove = func(obj interface{}) { | |
switch obj := obj.(type) { | |
case cache.DeletedFinalStateUnknown: | |
remove(obj.Obj) | |
break | |
default: | |
w.object = nil | |
} | |
eventCh <- "configmap removed" | |
} | |
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | |
AddFunc: func(obj interface{}) { | |
w.object = obj.(*corev1.ConfigMap) | |
eventCh <- "adding configmap" | |
}, | |
UpdateFunc: func(old, cur interface{}) { | |
if cmp.Equal(old, cur, | |
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion")) { | |
return | |
} | |
w.object = cur.(*corev1.ConfigMap) | |
eventCh <- fmt.Sprintf("updating configmap to %v", w.object.Data) | |
}, | |
DeleteFunc: remove, | |
}) | |
// start the informer in a goroutine | |
go kubeInformerFactory.Start(stopCh) | |
return w, nil | |
} | |
// ---------------------------- boilerplate ------------------------------------------------------ | |
// createApiserverClient creates a new Kubernetes REST client. apiserverHost is | |
// the URL of the API server in the format protocol://address:port/pathPrefix, | |
// kubeConfig is the location of a kubeconfig file. If defined, the kubeconfig | |
// file is loaded first, the URL of the API server read from the file is then | |
// optionally overridden by the value of apiserverHost. | |
// If neither apiserverHost nor kubeConfig is passed in, we assume the | |
// controller runs inside Kubernetes and fallback to the in-cluster config. If | |
// the in-cluster config is missing or fails, we fallback to the default config. | |
func createApiserverClient(apiserverHost, kubeConfig string) (*kubernetes.Clientset, error) { | |
cfg, err := clientcmd.BuildConfigFromFlags(apiserverHost, kubeConfig) | |
if err != nil { | |
return nil, err | |
} | |
klog.Infof("Creating API client for %s", cfg.Host) | |
client, err := kubernetes.NewForConfig(cfg) | |
if err != nil { | |
return nil, err | |
} | |
var v *discovery.Info | |
// The client may fail to connect to the API server in the first request. | |
// https://github.com/kubernetes/ingress-nginx/issues/1968 | |
defaultRetry := wait.Backoff{ | |
Steps: 10, | |
Duration: 1 * time.Second, | |
Factor: 1.5, | |
Jitter: 0.1, | |
} | |
var lastErr error | |
retries := 0 | |
klog.V(2).Info("Trying to discover Kubernetes version") | |
err = wait.ExponentialBackoff(defaultRetry, func() (bool, error) { | |
v, err = client.Discovery().ServerVersion() | |
if err == nil { | |
return true, nil | |
} | |
lastErr = err | |
klog.V(2).Infof("Unexpected error discovering Kubernetes version (attempt %v): %v", retries, err) | |
retries++ | |
return false, nil | |
}) | |
// err is returned in case of timeout in the exponential backoff (ErrWaitTimeout) | |
if err != nil { | |
return nil, lastErr | |
} | |
// this should not happen, warn the user | |
if retries > 0 { | |
klog.Warningf("Initial connection to the Kubernetes API server was retried %d times.", retries) | |
} | |
klog.Infof("Running in Kubernetes cluster version v%v.%v (%v) - git (%v) commit %v - platform %v", | |
v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform) | |
return client, nil | |
} | |
// Handler for fatal init errors. Prints a verbose error message and exits. | |
func handleFatalInitError(err error) { | |
klog.Fatalf("Error while initiating a connection to the Kubernetes API server. "+ | |
"This could mean the cluster is misconfigured (e.g. it has invalid API server certificates "+ | |
"or Service Accounts configuration). Reason: %s\n"+ | |
"Refer to the troubleshooting guide for more information: "+ | |
"https://kubernetes.github.io/ingress-nginx/troubleshooting/", | |
err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
kubectl proxy
go run watch-single-object.go