Skip to content

Instantly share code, notes, and snippets.

@fiunchinho
Created August 15, 2017 11:54
Show Gist options
  • Save fiunchinho/53cc88107949eba40b85f9284238394d to your computer and use it in GitHub Desktop.
Save fiunchinho/53cc88107949eba40b85f9284238394d to your computer and use it in GitHub Desktop.
Controller that whitelist IP's on ingress objects
package main
import (
"flag"
"fmt"
"log"
"reflect"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/clientcmd"
"github.com/golang/glog"
"github.com/fiunchinho/dmz-controller/whitelist"
"strings"
"k8s.io/client-go/pkg/api/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const DMZProvidersAnnotation = "armesto.net/ingress"
const DMZConfigMapName = "dmz-controller"
const IngressWhitelistAnnotation = "ingress.kubernetes.io/whitelist-source-range"
var (
// queue is a queue of resources to be processed. It performs exponential
// backoff rate limiting, with a minimum retry period of 5 seconds and a
// maximum of 1 minute.
queue = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Minute))
// stopCh can be used to stop all the informer, as well as control loops
// within the application.
stopCh = make(chan struct{})
// sharedFactory is a shared informer factory that is used as a cache for
// items in the API server. It saves each informer listing and watching the
// same resources independently of each other, thus providing more up to
// date results with less 'effort'
sharedFactory informers.SharedInformerFactory
// client is a Kubernetes API client for our custom resource definition type
client kubernetes.Interface
)
func main() {
// When running as a pod in-cluster, a kubeconfig is not needed. Instead this will make use of the service account injected into the pod.
// However, allow the use of a local kubeconfig as this can make local development & testing easier.
kubeconfig := flag.String("kubeconfig", "", "Path to a kubeconfig file")
// We log to stderr because glog will default to logging to a file.
// By setting this debugging is easier via `kubectl logs`
flag.Set("logtostderr", "true")
flag.Parse()
// Build the client config - optionally using a provided kubeconfig file.
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
glog.Fatalf("Failed to load client config: %v", err)
}
// Construct the Kubernetes client
client, err = kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating kubernetes client: %s", err.Error())
}
log.Printf("Created Kubernetes client.")
// we use a shared informer from the informer factory, to save calls to the
// API as we grow our application and so state is consistent between our
// control loops. We set a resync period of 30 seconds, in case any
// create/replace/update/delete operations are missed when watching
sharedFactory = informers.NewSharedInformerFactory(client, time.Second*30)
informer := sharedFactory.Extensions().V1beta1().Ingresses().Informer()
// we add a new event handler, watching for changes to API resources.
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: enqueue,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
enqueue(cur)
}
},
DeleteFunc: enqueue,
},
)
// start the informer. This will cause it to begin receiving updates from
// the configured API server and firing event handlers in response.
sharedFactory.Start(stopCh)
log.Printf("Started informer factory.")
// wait for the informer cache to finish performing it's initial applyWhiteList of resources
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
log.Fatalf("error waiting for informer cache to applyWhiteList: %s", err.Error())
}
log.Printf("Finished populating shared informers cache.")
// here we start just one worker reading objects off the queue. If you
// wanted to parallelize this, you could start many instances of the worker
// function, then ensure your application handles concurrency correctly.
for {
// we read a message off the queue
key, shutdown := queue.Get()
// if the queue has been shut down, we should exit the work queue here
if shutdown {
stopCh <- struct{}{}
return
}
// convert the queue item into a string. If it's not a string, we'll
// simply discard it as invalid data and log a message.
var strKey string
var ok bool
if strKey, ok = key.(string); !ok {
runtime.HandleError(fmt.Errorf("key in queue should be of type string but got %T. discarding", key))
return
}
// we define a function here to process a queue item, so that we can
// use 'defer' to make sure the message is marked as Done on the queue
func(key string) {
defer queue.Done(key)
// attempt to split the 'key' into namespace and object name
namespace, name, err := cache.SplitMetaNamespaceKey(strKey)
if err != nil {
runtime.HandleError(fmt.Errorf("error splitting meta namespace key into parts: %s", err.Error()))
return
}
log.Printf("Read key '%s/%s' off workqueue. Fetching from cache...", namespace, name)
// retrieve the latest version in the cache of this alert
obj, err := sharedFactory.Extensions().V1beta1().Ingresses().Lister().Ingresses(namespace).Get(name)
if err != nil {
runtime.HandleError(fmt.Errorf("error getting object '%s/%s' from api: %s", namespace, name, err.Error()))
return
}
//configMap, err := sharedFactory.Core().V1().ConfigMaps().Lister().ConfigMaps(namespace).Get(DMZConfigMapName)
configMap, err := client.CoreV1().ConfigMaps(namespace).Get(DMZConfigMapName, meta_v1.GetOptions{})
if err != nil {
runtime.HandleError(fmt.Errorf("error getting dmz configmap: %s", err.Error()))
return
}
log.Printf("Got '%s/%s' object from cache.", namespace, name)
// attempt to applyWhiteList the current state of the world with the desired!
// If applyWhiteList returns an error, we skip calling `queue.Forget`,
// thus causing the resource to be requeued at a later time.
if err := applyWhiteList(obj, configMap); err != nil {
runtime.HandleError(fmt.Errorf("error processing item '%s/%s': %s", namespace, name, err.Error()))
return
}
log.Printf("Finished processing '%s/%s' successfully! Removing from queue.", namespace, name)
// as we managed to process this successfully, we can forget it
// from the work queue altogether.
queue.Forget(key)
}(strKey)
}
}
// This method is called whenever this controller starts, and whenever the resource changes, and also periodically every resyncPeriod.
// If an error occurs here, we return an error which will cause the calling function to re-queue the item to be tried again later.
func applyWhiteList(ingress *v1beta1.Ingress, configmap *v1.ConfigMap) error {
if ingress.Annotations == nil {
ingress.Annotations = make(map[string]string)
}
provider, ok := ingress.Annotations[DMZProvidersAnnotation]
if !ok {
return nil
}
currentWhitelistedIps := whitelist.NewWhitelistFromString(ingress.Annotations[IngressWhitelistAnnotation])
if _, ok := ingress.Annotations["dmz-controller"]; ok {
currentWhitelistedIps.Minus(whitelist.NewWhitelistFromString(ingress.Annotations["dmz-controller"]))
}
whitelistToApply := getWhitelistFromProvider(provider, configmap.Data)
ingress.Annotations["dmz-controller"] = whitelistToApply.ToString()
whitelistToApply.Merge(currentWhitelistedIps)
ingress.Annotations[IngressWhitelistAnnotation] = whitelistToApply.ToString()
// as we've sent the note, we will update the resource accordingly.
// if this request fails, this item will be requeued and a second alert
// will be sent. It's therefore worth noting that this control loop will
// send you *at least one* alert, and not *at most one*.
if _, err := client.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress); err != nil {
return fmt.Errorf("error saving update to Ingress resource: %s", err.Error())
}
log.Printf("Saved update to Ingress resource '%s/%s'", ingress.Namespace, ingress.Name)
// we didn't encounter any errors, so we return nil to allow the callee
// to 'forget' this item from the queue altogether.
return nil
}
func getWhitelistFromProvider(provider string, whitelistProviders map[string]string) *whitelist.Whitelist {
whitelistToApply := whitelist.NewEmptyWhitelist()
for _, value := range strings.Split(provider, ",") {
providerWhitelist := whitelist.NewWhitelistFromString(whitelistProviders[strings.TrimSpace(value)])
whitelistToApply.Merge(providerWhitelist)
}
return whitelistToApply
}
// enqueue will add an object 'obj' into the workqueue. The object being added
// must be of type metav1.Object, metav1.ObjectAccessor or cache.ExplicitKey.
func enqueue(obj interface{}) {
// DeletionHandlingMetaNamespaceKeyFunc will convert an object into a
// 'namespace/name' string. We do this because our item may be processed
// much later than now, and so we want to ensure it gets a fresh copy of
// the resource when it starts. Also, this allows us to keep adding the
// same item into the work queue without duplicates building up.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("error obtaining key for object being enqueue: %s", err.Error()))
return
}
// add the item to the queue
queue.Add(key)
}
@munnerz
Copy link

munnerz commented Aug 15, 2017

@fiunchinho you shouldn't need to. In the add/change/delete function for the ConfigMap you should 'lookup' which Ingress resources are affected/might possibly be interested by a change to that ConfigMap and correspondingly add those Ingresses to the standard workqueue so that they can be reprocessed/checked to ensure they are up to date.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment