Skip to content

Instantly share code, notes, and snippets.

@aledbf
Created June 29, 2019 20:34
Show Gist options
  • Save aledbf/030f90fd8fe087048782fe8d0cfd9ce2 to your computer and use it in GitHub Desktop.
Save aledbf/030f90fd8fe087048782fe8d0cfd9ce2 to your computer and use it in GitHub Desktop.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
discovery "k8s.io/apimachinery/pkg/version"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
cache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/klog"
)
func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
flag.CommandLine.Parse([]string{})
kubeClient, err := createApiserverClient("http://127.0.0.1:8001", "/home/aledbf/.kube/config")
if err != nil {
handleFatalInitError(err)
}
// stop channel
stopCh := make(chan struct{})
// event channel (temporal)
eventCh := make(chan string)
// configmap to watch
key := "default/test"
klog.Infof("Watching configmap %v", key)
w, err := watchConfigmap(key, eventCh, stopCh, kubeClient)
if err != nil {
handleFatalInitError(err)
}
go func(w *watcher, eventCh chan string) {
for {
select {
case reason := <-eventCh:
// for now just show a string with event and the configmap
klog.Infof("[K8S data change] - reason: %v", reason)
configmap, err := w.GetConfigMap()
if err != nil {
klog.Error(err)
continue
}
klog.Infof("[K8S object] %v", configmap)
case <-stopCh:
break
}
}
}(w, eventCh)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM)
<-signalChan
klog.Info("Received SIGTERM, shutting down")
time.Sleep(10 * time.Second)
}
type ConfigmapWatcher interface {
GetConfigMap() (*corev1.ConfigMap, error)
}
func NewConfigmapWatcher(key string, eventCh chan string, stopCh chan struct{}, kubeClient kubernetes.Interface) (ConfigmapWatcher, error) {
w, err := watchConfigmap(key, eventCh, stopCh, kubeClient)
if err != nil {
return nil, err
}
return w, nil
}
type watcher struct {
object *corev1.ConfigMap
}
func (w *watcher) GetConfigMap() (*corev1.ConfigMap, error) {
return w.object, nil
}
func watchConfigmap(key string, eventCh chan string, stopCh chan struct{}, kubeClient kubernetes.Interface) (*watcher, error) {
ns, name, err := k8s.ParseNameNS(key)
if err != nil {
return nil, err
}
kubeInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(kubeClient, 0, ns,
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
},
)
w := &watcher{}
informer := kubeInformerFactory.Core().V1().ConfigMaps().Informer()
var remove func(obj interface{})
remove = func(obj interface{}) {
switch obj := obj.(type) {
case cache.DeletedFinalStateUnknown:
remove(obj.Obj)
break
default:
w.object = nil
}
eventCh <- "configmap removed"
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
w.object = obj.(*corev1.ConfigMap)
eventCh <- "adding configmap"
},
UpdateFunc: func(old, cur interface{}) {
if cmp.Equal(old, cur,
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion")) {
return
}
w.object = cur.(*corev1.ConfigMap)
eventCh <- fmt.Sprintf("updating configmap to %v", w.object.Data)
},
DeleteFunc: remove,
})
// start the informer in a goroutine
go kubeInformerFactory.Start(stopCh)
return w, nil
}
// ---------------------------- boilerplate ------------------------------------------------------
// createApiserverClient creates a new Kubernetes REST client. apiserverHost is
// the URL of the API server in the format protocol://address:port/pathPrefix,
// kubeConfig is the location of a kubeconfig file. If defined, the kubeconfig
// file is loaded first, the URL of the API server read from the file is then
// optionally overridden by the value of apiserverHost.
// If neither apiserverHost nor kubeConfig is passed in, we assume the
// controller runs inside Kubernetes and fallback to the in-cluster config. If
// the in-cluster config is missing or fails, we fallback to the default config.
func createApiserverClient(apiserverHost, kubeConfig string) (*kubernetes.Clientset, error) {
cfg, err := clientcmd.BuildConfigFromFlags(apiserverHost, kubeConfig)
if err != nil {
return nil, err
}
klog.Infof("Creating API client for %s", cfg.Host)
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
var v *discovery.Info
// The client may fail to connect to the API server in the first request.
// https://github.com/kubernetes/ingress-nginx/issues/1968
defaultRetry := wait.Backoff{
Steps: 10,
Duration: 1 * time.Second,
Factor: 1.5,
Jitter: 0.1,
}
var lastErr error
retries := 0
klog.V(2).Info("Trying to discover Kubernetes version")
err = wait.ExponentialBackoff(defaultRetry, func() (bool, error) {
v, err = client.Discovery().ServerVersion()
if err == nil {
return true, nil
}
lastErr = err
klog.V(2).Infof("Unexpected error discovering Kubernetes version (attempt %v): %v", retries, err)
retries++
return false, nil
})
// err is returned in case of timeout in the exponential backoff (ErrWaitTimeout)
if err != nil {
return nil, lastErr
}
// this should not happen, warn the user
if retries > 0 {
klog.Warningf("Initial connection to the Kubernetes API server was retried %d times.", retries)
}
klog.Infof("Running in Kubernetes cluster version v%v.%v (%v) - git (%v) commit %v - platform %v",
v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform)
return client, nil
}
// Handler for fatal init errors. Prints a verbose error message and exits.
func handleFatalInitError(err error) {
klog.Fatalf("Error while initiating a connection to the Kubernetes API server. "+
"This could mean the cluster is misconfigured (e.g. it has invalid API server certificates "+
"or Service Accounts configuration). Reason: %s\n"+
"Refer to the troubleshooting guide for more information: "+
"https://kubernetes.github.io/ingress-nginx/troubleshooting/",
err)
}
@aledbf
Copy link
Author

aledbf commented Jun 29, 2019

  1. kubectl proxy
  2. run the watcher go run watch-single-object.go
  3. create the configmap and watch the change
echo "
kind: ConfigMap
apiVersion: v1
metadata:
  name: test
data:
  X: '1'
" | kubectl apply -f -
  1. change the configmap values
kubectl patch configmap/test --type merge -p '{"data":{"X":"something else"}}'
  1. if the same patch is applied multiple times, we should not see any update in the console

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment