Skip to content

Instantly share code, notes, and snippets.

@thoro
Created August 5, 2017 15:45
Show Gist options
  • Save thoro/2e81754c253a85981e06d4fd6839eee7 to your computer and use it in GitHub Desktop.
Save thoro/2e81754c253a85981e06d4fd6839eee7 to your computer and use it in GitHub Desktop.
package watchers
import (
"time"
"reflect"
"git.itdata.at/creamfinance/kube-egress/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/rest"
cache "k8s.io/client-go/tools/cache"
runtime "k8s.io/apimachinery/pkg/runtime"
)
type Operation int
const (
ADD Operation = iota
UPDATE
REMOVE
SYNCED
)
type Update struct {
Item interface{}
Op Operation
}
type Watcher struct {
controller cache.Controller
lister cache.Indexer
broadcaster *utils.Broadcaster
stopChannel chan struct{}
}
type UpdatesHandler interface {
OnItemUpdate(update *Update)
}
var (
watchers map[string]*Watcher
)
func (nw *Watcher) genericAddEventHandler(obj interface{}) {
nw.broadcaster.Notify(&Update{ Op: ADD, Item: obj })
}
func (nw *Watcher) genericDeleteEventHandler(obj interface{}) {
nw.broadcaster.Notify(&Update{ Op: REMOVE, Item: obj })
}
func (nw *Watcher) genericUpdateEventHandler(oldObj, newObj interface{}) {
if !reflect.DeepEqual(newObj, oldObj) {
nw.broadcaster.Notify(&Update{ Op: UPDATE, Item: newObj })
}
}
func (nw *Watcher) RegisterHandler(handler UpdatesHandler) {
nw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
handler.OnItemUpdate(instance.(*Update))
}))
}
func (nw *Watcher) HasSynced() bool {
return nw.controller.HasSynced()
}
func (nw *Watcher) List() []interface{} {
return nw.lister.List()
}
func StartItemWatcher(client rest.Interface, resyncPeriod time.Duration, rest_name string, obj runtime.Object) (*Watcher, error) {
if watchers == nil {
watchers = make(map[string]*Watcher, 0)
}
watcher := Watcher{}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: watcher.genericAddEventHandler,
DeleteFunc: watcher.genericDeleteEventHandler,
UpdateFunc: watcher.genericUpdateEventHandler,
}
watcher.broadcaster = utils.NewBroadcaster()
lw := cache.NewListWatchFromClient(client, rest_name, metav1.NamespaceAll, fields.Everything())
watcher.lister, watcher.controller = cache.NewIndexerInformer(
lw,
obj,
resyncPeriod,
eventHandler,
cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
},
)
watcher.stopChannel = make(chan struct{})
watchers[rest_name] = &watcher
go watcher.controller.Run(watcher.stopChannel)
return &watcher, nil
}
func GetItemWatcher(rest_name string) *Watcher {
if watcher, ok := watchers[rest_name]; ok {
return watcher
}
return nil
}
func StopItemWatcher(rest_name string) {
if watcher, ok := watchers[rest_name]; ok {
watcher.stopChannel <- struct{}{}
}
}
package watchers
import (
"git.itdata.at/creamfinance/kube-egress/app/creamfinance"
api "k8s.io/client-go/pkg/api/v1"
apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
// "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
listers "k8s.io/client-go/listers/core/v1"
)
func (nw *Watcher) ListEndpoints() []*api.Endpoints {
obj_list := nw.lister.List()
ep_instances := make([]*api.Endpoints, len(obj_list))
for i, ins := range obj_list {
ep_instances[i] = ins.(*api.Endpoints)
}
return ep_instances
}
func (nw *Watcher) ListNodes() []*api.Node {
obj_list := nw.lister.List()
ep_instances := make([]*api.Node, len(obj_list))
for i, ins := range obj_list {
ep_instances[i] = ins.(*api.Node)
}
return ep_instances
}
func (nw *Watcher) ListPods() []*api.Pod {
obj_list := nw.lister.List()
ep_instances := make([]*api.Pod, len(obj_list))
for i, ins := range obj_list {
ep_instances[i] = ins.(*api.Pod)
}
return ep_instances
}
func (nw *Watcher) ListServices() []*api.Service {
obj_list := nw.lister.List()
ep_instances := make([]*api.Service, len(obj_list))
for i, ins := range obj_list {
ep_instances[i] = ins.(*api.Service)
}
return ep_instances
}
func (nw *Watcher) ListNetworkPolicies() []*apiextensions.NetworkPolicy {
obj_list := nw.lister.List()
ep_instances := make([]*apiextensions.NetworkPolicy, len(obj_list))
for i, ins := range obj_list {
ep_instances[i] = ins.(*apiextensions.NetworkPolicy)
}
return ep_instances
}
func (nw *Watcher) ListPodIPs() []*creamfinance.PodIP {
obj_list := nw.lister.List()
ep_instances := make([]*creamfinance.PodIP, len(obj_list))
for i, ins := range obj_list {
ep_instances[i] = ins.(*creamfinance.PodIP)
}
return ep_instances
}
func (nw *Watcher) ListPodsByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) {
podLister := listers.NewPodLister(nw.lister)
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector())
if err != nil {
return nil, err
}
return allMatchedNameSpacePods, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment