Skip to content

Instantly share code, notes, and snippets.

@fiunchinho
Created August 15, 2017 11:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fiunchinho/53cc88107949eba40b85f9284238394d to your computer and use it in GitHub Desktop.
Save fiunchinho/53cc88107949eba40b85f9284238394d to your computer and use it in GitHub Desktop.
Controller that whitelist IP's on ingress objects
package main
import (
"flag"
"fmt"
"log"
"reflect"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/clientcmd"
"github.com/golang/glog"
"github.com/fiunchinho/dmz-controller/whitelist"
"strings"
"k8s.io/client-go/pkg/api/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const DMZProvidersAnnotation = "armesto.net/ingress"
const DMZConfigMapName = "dmz-controller"
const IngressWhitelistAnnotation = "ingress.kubernetes.io/whitelist-source-range"
var (
// queue is a queue of resources to be processed. It performs exponential
// backoff rate limiting, with a minimum retry period of 5 seconds and a
// maximum of 1 minute.
queue = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Minute))
// stopCh can be used to stop all the informer, as well as control loops
// within the application.
stopCh = make(chan struct{})
// sharedFactory is a shared informer factory that is used as a cache for
// items in the API server. It saves each informer listing and watching the
// same resources independently of each other, thus providing more up to
// date results with less 'effort'
sharedFactory informers.SharedInformerFactory
// client is a Kubernetes API client for our custom resource definition type
client kubernetes.Interface
)
func main() {
// When running as a pod in-cluster, a kubeconfig is not needed. Instead this will make use of the service account injected into the pod.
// However, allow the use of a local kubeconfig as this can make local development & testing easier.
kubeconfig := flag.String("kubeconfig", "", "Path to a kubeconfig file")
// We log to stderr because glog will default to logging to a file.
// By setting this debugging is easier via `kubectl logs`
flag.Set("logtostderr", "true")
flag.Parse()
// Build the client config - optionally using a provided kubeconfig file.
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
glog.Fatalf("Failed to load client config: %v", err)
}
// Construct the Kubernetes client
client, err = kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating kubernetes client: %s", err.Error())
}
log.Printf("Created Kubernetes client.")
// we use a shared informer from the informer factory, to save calls to the
// API as we grow our application and so state is consistent between our
// control loops. We set a resync period of 30 seconds, in case any
// create/replace/update/delete operations are missed when watching
sharedFactory = informers.NewSharedInformerFactory(client, time.Second*30)
informer := sharedFactory.Extensions().V1beta1().Ingresses().Informer()
// we add a new event handler, watching for changes to API resources.
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: enqueue,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
enqueue(cur)
}
},
DeleteFunc: enqueue,
},
)
// start the informer. This will cause it to begin receiving updates from
// the configured API server and firing event handlers in response.
sharedFactory.Start(stopCh)
log.Printf("Started informer factory.")
// wait for the informer cache to finish performing it's initial applyWhiteList of resources
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
log.Fatalf("error waiting for informer cache to applyWhiteList: %s", err.Error())
}
log.Printf("Finished populating shared informers cache.")
// here we start just one worker reading objects off the queue. If you
// wanted to parallelize this, you could start many instances of the worker
// function, then ensure your application handles concurrency correctly.
for {
// we read a message off the queue
key, shutdown := queue.Get()
// if the queue has been shut down, we should exit the work queue here
if shutdown {
stopCh <- struct{}{}
return
}
// convert the queue item into a string. If it's not a string, we'll
// simply discard it as invalid data and log a message.
var strKey string
var ok bool
if strKey, ok = key.(string); !ok {
runtime.HandleError(fmt.Errorf("key in queue should be of type string but got %T. discarding", key))
return
}
// we define a function here to process a queue item, so that we can
// use 'defer' to make sure the message is marked as Done on the queue
func(key string) {
defer queue.Done(key)
// attempt to split the 'key' into namespace and object name
namespace, name, err := cache.SplitMetaNamespaceKey(strKey)
if err != nil {
runtime.HandleError(fmt.Errorf("error splitting meta namespace key into parts: %s", err.Error()))
return
}
log.Printf("Read key '%s/%s' off workqueue. Fetching from cache...", namespace, name)
// retrieve the latest version in the cache of this alert
obj, err := sharedFactory.Extensions().V1beta1().Ingresses().Lister().Ingresses(namespace).Get(name)
if err != nil {
runtime.HandleError(fmt.Errorf("error getting object '%s/%s' from api: %s", namespace, name, err.Error()))
return
}
//configMap, err := sharedFactory.Core().V1().ConfigMaps().Lister().ConfigMaps(namespace).Get(DMZConfigMapName)
configMap, err := client.CoreV1().ConfigMaps(namespace).Get(DMZConfigMapName, meta_v1.GetOptions{})
if err != nil {
runtime.HandleError(fmt.Errorf("error getting dmz configmap: %s", err.Error()))
return
}
log.Printf("Got '%s/%s' object from cache.", namespace, name)
// attempt to applyWhiteList the current state of the world with the desired!
// If applyWhiteList returns an error, we skip calling `queue.Forget`,
// thus causing the resource to be requeued at a later time.
if err := applyWhiteList(obj, configMap); err != nil {
runtime.HandleError(fmt.Errorf("error processing item '%s/%s': %s", namespace, name, err.Error()))
return
}
log.Printf("Finished processing '%s/%s' successfully! Removing from queue.", namespace, name)
// as we managed to process this successfully, we can forget it
// from the work queue altogether.
queue.Forget(key)
}(strKey)
}
}
// This method is called whenever this controller starts, and whenever the resource changes, and also periodically every resyncPeriod.
// If an error occurs here, we return an error which will cause the calling function to re-queue the item to be tried again later.
func applyWhiteList(ingress *v1beta1.Ingress, configmap *v1.ConfigMap) error {
if ingress.Annotations == nil {
ingress.Annotations = make(map[string]string)
}
provider, ok := ingress.Annotations[DMZProvidersAnnotation]
if !ok {
return nil
}
currentWhitelistedIps := whitelist.NewWhitelistFromString(ingress.Annotations[IngressWhitelistAnnotation])
if _, ok := ingress.Annotations["dmz-controller"]; ok {
currentWhitelistedIps.Minus(whitelist.NewWhitelistFromString(ingress.Annotations["dmz-controller"]))
}
whitelistToApply := getWhitelistFromProvider(provider, configmap.Data)
ingress.Annotations["dmz-controller"] = whitelistToApply.ToString()
whitelistToApply.Merge(currentWhitelistedIps)
ingress.Annotations[IngressWhitelistAnnotation] = whitelistToApply.ToString()
// as we've sent the note, we will update the resource accordingly.
// if this request fails, this item will be requeued and a second alert
// will be sent. It's therefore worth noting that this control loop will
// send you *at least one* alert, and not *at most one*.
if _, err := client.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress); err != nil {
return fmt.Errorf("error saving update to Ingress resource: %s", err.Error())
}
log.Printf("Saved update to Ingress resource '%s/%s'", ingress.Namespace, ingress.Name)
// we didn't encounter any errors, so we return nil to allow the callee
// to 'forget' this item from the queue altogether.
return nil
}
func getWhitelistFromProvider(provider string, whitelistProviders map[string]string) *whitelist.Whitelist {
whitelistToApply := whitelist.NewEmptyWhitelist()
for _, value := range strings.Split(provider, ",") {
providerWhitelist := whitelist.NewWhitelistFromString(whitelistProviders[strings.TrimSpace(value)])
whitelistToApply.Merge(providerWhitelist)
}
return whitelistToApply
}
// enqueue will add an object 'obj' into the workqueue. The object being added
// must be of type metav1.Object, metav1.ObjectAccessor or cache.ExplicitKey.
func enqueue(obj interface{}) {
// DeletionHandlingMetaNamespaceKeyFunc will convert an object into a
// 'namespace/name' string. We do this because our item may be processed
// much later than now, and so we want to ensure it gets a fresh copy of
// the resource when it starts. Also, this allows us to keep adding the
// same item into the work queue without duplicates building up.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("error obtaining key for object being enqueue: %s", err.Error()))
return
}
// add the item to the queue
queue.Add(key)
}
@fiunchinho
Copy link
Author

Line 149 doesn't work, throwing
E0815 10:32:32.411134 1 main.go:173] error getting dmz configmap: configmap "dmz-controller" not found

It works with line 150, but if I have dozens of Ingress objects, it means it would make dozens of requests to the API server to fetch the same ConfigMap in a matter of seconds, right?

@munnerz
Copy link

munnerz commented Aug 15, 2017

Hey - so the issue here is that you've not actually told the SharedInformerFactory to watch for ConfigMap resources, so it has none in it's cache. You have told it to watch for Ingress resources on this line: https://gist.github.com/fiunchinho/53cc88107949eba40b85f9284238394d#file-main-go-L99 with the informer.HasSynced function. The WaitForCacheSync is variadic, so you should be able to add the HasSynced function for a ConfigMap informer here too (even if that informer has no Event Handlers registered). ie something like this:

// Get references to informers for relevant types
informer := sharedFactory.Extensions().V1beta1().Ingresses().Informer()
cmInformer := sharedFactory.Core().V1().ConfigMaps().Informer()

// set up EventHandlers for any types that we need EventHandlers for - this step is *optional*
// for those types for which we don't actually need to register handlers for (eg. in this case, we don't
// need to add an EventHandler for the ConfigMap type)
...

// by including both 'HasSynced' functions here, we make sure both the Ingress informer and ConfigMap informer has
// synced before continuing, thus making everything work as expected :)
if !cache.WaitForCacheSync(stopCh, informer.HasSynced, cmInformer.HasSynced) {
...

@fiunchinho
Copy link
Author

That did the trick! I thought informers were needed only for resources that are being watched, but now I see that watching or not depends on the event handlers attached to the informer.

Another thing I don't fully understand is that, I was under the impression that every 30 seconds I'd get all the Ingress objects stored in the API due to the resync period. From this guide

Watches and Informers will “sync”. Periodically, they will deliver every matching object in the cluster to your Update method. This is good for cases where you may need to take additional action on the object, but sometimes you know there won't be more work to do.

But it seems that the code is only being executed if I create/update/delete an ingress object. I'm interested in this behavior because the values in the ConfigMap may change overtime, so having a resync period would apply the new values stored in the ConfigMaps. What am I missing?

@munnerz
Copy link

munnerz commented Aug 15, 2017

So the resyncing functionality shouldn't be relied upon for this sort of thing as far as I can tell - the resync period is the period in which the client will effectively perform a full List against the apiserver instead of just watching for deltas. I've noticed this behaviour too (not getting the Update function called every resyncPeriod), and all I can think the reason for this is, is because there is some internal logic in the cache that see's the item hasn't change so doesn't call your Update method. The whole purpose of the resync is to catch up with missed events, ie. if for some reason one of these deltas is missed, a full resync will be performed every X seconds to make up for that.

In your case, instead, you should set up an informer that watches ConfigMap objects for updates/deletions/creations, and if one occurs, you should look up the relevant Ingress resources and add that Ingress resources DeletionHandlingMetaNamespaceKey to your workqueue to be 're synced'. By doing it this way with the DeletionHandlingMetaNamespaceKey, you also save having to implement logic specific to ConfigMap updates (and instead everything is just a 'sync' between desired and actual state)

@fiunchinho
Copy link
Author

Should I use two queues for that?

@fiunchinho
Copy link
Author

Testing mentions @munnerz

@munnerz
Copy link

munnerz commented Aug 15, 2017

@fiunchinho you shouldn't need to. In the add/change/delete function for the ConfigMap you should 'lookup' which Ingress resources are affected/might possibly be interested by a change to that ConfigMap and correspondingly add those Ingresses to the standard workqueue so that they can be reprocessed/checked to ensure they are up to date.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment