Skip to content

Instantly share code, notes, and snippets.

@fiunchinho
Created August 15, 2017 11:54
Show Gist options
  • Save fiunchinho/53cc88107949eba40b85f9284238394d to your computer and use it in GitHub Desktop.
Save fiunchinho/53cc88107949eba40b85f9284238394d to your computer and use it in GitHub Desktop.
Controller that whitelist IP's on ingress objects
package main
import (
"flag"
"fmt"
"log"
"reflect"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/clientcmd"
"github.com/golang/glog"
"github.com/fiunchinho/dmz-controller/whitelist"
"strings"
"k8s.io/client-go/pkg/api/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const DMZProvidersAnnotation = "armesto.net/ingress"
const DMZConfigMapName = "dmz-controller"
const IngressWhitelistAnnotation = "ingress.kubernetes.io/whitelist-source-range"
var (
// queue is a queue of resources to be processed. It performs exponential
// backoff rate limiting, with a minimum retry period of 5 seconds and a
// maximum of 1 minute.
queue = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Minute))
// stopCh can be used to stop all the informer, as well as control loops
// within the application.
stopCh = make(chan struct{})
// sharedFactory is a shared informer factory that is used as a cache for
// items in the API server. It saves each informer listing and watching the
// same resources independently of each other, thus providing more up to
// date results with less 'effort'
sharedFactory informers.SharedInformerFactory
// client is a Kubernetes API client for our custom resource definition type
client kubernetes.Interface
)
func main() {
// When running as a pod in-cluster, a kubeconfig is not needed. Instead this will make use of the service account injected into the pod.
// However, allow the use of a local kubeconfig as this can make local development & testing easier.
kubeconfig := flag.String("kubeconfig", "", "Path to a kubeconfig file")
// We log to stderr because glog will default to logging to a file.
// By setting this debugging is easier via `kubectl logs`
flag.Set("logtostderr", "true")
flag.Parse()
// Build the client config - optionally using a provided kubeconfig file.
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
glog.Fatalf("Failed to load client config: %v", err)
}
// Construct the Kubernetes client
client, err = kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating kubernetes client: %s", err.Error())
}
log.Printf("Created Kubernetes client.")
// we use a shared informer from the informer factory, to save calls to the
// API as we grow our application and so state is consistent between our
// control loops. We set a resync period of 30 seconds, in case any
// create/replace/update/delete operations are missed when watching
sharedFactory = informers.NewSharedInformerFactory(client, time.Second*30)
informer := sharedFactory.Extensions().V1beta1().Ingresses().Informer()
// we add a new event handler, watching for changes to API resources.
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: enqueue,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
enqueue(cur)
}
},
DeleteFunc: enqueue,
},
)
// start the informer. This will cause it to begin receiving updates from
// the configured API server and firing event handlers in response.
sharedFactory.Start(stopCh)
log.Printf("Started informer factory.")
// wait for the informer cache to finish performing it's initial applyWhiteList of resources
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
log.Fatalf("error waiting for informer cache to applyWhiteList: %s", err.Error())
}
log.Printf("Finished populating shared informers cache.")
// here we start just one worker reading objects off the queue. If you
// wanted to parallelize this, you could start many instances of the worker
// function, then ensure your application handles concurrency correctly.
for {
// we read a message off the queue
key, shutdown := queue.Get()
// if the queue has been shut down, we should exit the work queue here
if shutdown {
stopCh <- struct{}{}
return
}
// convert the queue item into a string. If it's not a string, we'll
// simply discard it as invalid data and log a message.
var strKey string
var ok bool
if strKey, ok = key.(string); !ok {
runtime.HandleError(fmt.Errorf("key in queue should be of type string but got %T. discarding", key))
return
}
// we define a function here to process a queue item, so that we can
// use 'defer' to make sure the message is marked as Done on the queue
func(key string) {
defer queue.Done(key)
// attempt to split the 'key' into namespace and object name
namespace, name, err := cache.SplitMetaNamespaceKey(strKey)
if err != nil {
runtime.HandleError(fmt.Errorf("error splitting meta namespace key into parts: %s", err.Error()))
return
}
log.Printf("Read key '%s/%s' off workqueue. Fetching from cache...", namespace, name)
// retrieve the latest version in the cache of this alert
obj, err := sharedFactory.Extensions().V1beta1().Ingresses().Lister().Ingresses(namespace).Get(name)
if err != nil {
runtime.HandleError(fmt.Errorf("error getting object '%s/%s' from api: %s", namespace, name, err.Error()))
return
}
//configMap, err := sharedFactory.Core().V1().ConfigMaps().Lister().ConfigMaps(namespace).Get(DMZConfigMapName)
configMap, err := client.CoreV1().ConfigMaps(namespace).Get(DMZConfigMapName, meta_v1.GetOptions{})
if err != nil {
runtime.HandleError(fmt.Errorf("error getting dmz configmap: %s", err.Error()))
return
}
log.Printf("Got '%s/%s' object from cache.", namespace, name)
// attempt to applyWhiteList the current state of the world with the desired!
// If applyWhiteList returns an error, we skip calling `queue.Forget`,
// thus causing the resource to be requeued at a later time.
if err := applyWhiteList(obj, configMap); err != nil {
runtime.HandleError(fmt.Errorf("error processing item '%s/%s': %s", namespace, name, err.Error()))
return
}
log.Printf("Finished processing '%s/%s' successfully! Removing from queue.", namespace, name)
// as we managed to process this successfully, we can forget it
// from the work queue altogether.
queue.Forget(key)
}(strKey)
}
}
// This method is called whenever this controller starts, and whenever the resource changes, and also periodically every resyncPeriod.
// If an error occurs here, we return an error which will cause the calling function to re-queue the item to be tried again later.
func applyWhiteList(ingress *v1beta1.Ingress, configmap *v1.ConfigMap) error {
if ingress.Annotations == nil {
ingress.Annotations = make(map[string]string)
}
provider, ok := ingress.Annotations[DMZProvidersAnnotation]
if !ok {
return nil
}
currentWhitelistedIps := whitelist.NewWhitelistFromString(ingress.Annotations[IngressWhitelistAnnotation])
if _, ok := ingress.Annotations["dmz-controller"]; ok {
currentWhitelistedIps.Minus(whitelist.NewWhitelistFromString(ingress.Annotations["dmz-controller"]))
}
whitelistToApply := getWhitelistFromProvider(provider, configmap.Data)
ingress.Annotations["dmz-controller"] = whitelistToApply.ToString()
whitelistToApply.Merge(currentWhitelistedIps)
ingress.Annotations[IngressWhitelistAnnotation] = whitelistToApply.ToString()
// as we've sent the note, we will update the resource accordingly.
// if this request fails, this item will be requeued and a second alert
// will be sent. It's therefore worth noting that this control loop will
// send you *at least one* alert, and not *at most one*.
if _, err := client.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress); err != nil {
return fmt.Errorf("error saving update to Ingress resource: %s", err.Error())
}
log.Printf("Saved update to Ingress resource '%s/%s'", ingress.Namespace, ingress.Name)
// we didn't encounter any errors, so we return nil to allow the callee
// to 'forget' this item from the queue altogether.
return nil
}
func getWhitelistFromProvider(provider string, whitelistProviders map[string]string) *whitelist.Whitelist {
whitelistToApply := whitelist.NewEmptyWhitelist()
for _, value := range strings.Split(provider, ",") {
providerWhitelist := whitelist.NewWhitelistFromString(whitelistProviders[strings.TrimSpace(value)])
whitelistToApply.Merge(providerWhitelist)
}
return whitelistToApply
}
// enqueue will add an object 'obj' into the workqueue. The object being added
// must be of type metav1.Object, metav1.ObjectAccessor or cache.ExplicitKey.
func enqueue(obj interface{}) {
// DeletionHandlingMetaNamespaceKeyFunc will convert an object into a
// 'namespace/name' string. We do this because our item may be processed
// much later than now, and so we want to ensure it gets a fresh copy of
// the resource when it starts. Also, this allows us to keep adding the
// same item into the work queue without duplicates building up.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("error obtaining key for object being enqueue: %s", err.Error()))
return
}
// add the item to the queue
queue.Add(key)
}
@munnerz
Copy link

munnerz commented Aug 15, 2017

So the resyncing functionality shouldn't be relied upon for this sort of thing as far as I can tell - the resync period is the period in which the client will effectively perform a full List against the apiserver instead of just watching for deltas. I've noticed this behaviour too (not getting the Update function called every resyncPeriod), and all I can think the reason for this is, is because there is some internal logic in the cache that see's the item hasn't change so doesn't call your Update method. The whole purpose of the resync is to catch up with missed events, ie. if for some reason one of these deltas is missed, a full resync will be performed every X seconds to make up for that.

In your case, instead, you should set up an informer that watches ConfigMap objects for updates/deletions/creations, and if one occurs, you should look up the relevant Ingress resources and add that Ingress resources DeletionHandlingMetaNamespaceKey to your workqueue to be 're synced'. By doing it this way with the DeletionHandlingMetaNamespaceKey, you also save having to implement logic specific to ConfigMap updates (and instead everything is just a 'sync' between desired and actual state)

@fiunchinho
Copy link
Author

Should I use two queues for that?

@fiunchinho
Copy link
Author

Testing mentions @munnerz

@munnerz
Copy link

munnerz commented Aug 15, 2017

@fiunchinho you shouldn't need to. In the add/change/delete function for the ConfigMap you should 'lookup' which Ingress resources are affected/might possibly be interested by a change to that ConfigMap and correspondingly add those Ingresses to the standard workqueue so that they can be reprocessed/checked to ensure they are up to date.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment