Skip to content

Instantly share code, notes, and snippets.

@paha
Created August 2, 2017 23:23
Show Gist options
  • Save paha/d33847efefd06c4420ea00bd8678fa6e to your computer and use it in GitHub Desktop.
Save paha/d33847efefd06c4420ea00bd8678fa6e to your computer and use it in GitHub Desktop.
/*
Copyright 2016 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Some of the code below came from https://github.com/coreos/etcd-operator
which also has the apache 2.0 license.
*/
// Kube controller watching PVC events.
// * Provides ceph user credentials, ensuring each namespace that uses Rook,
// has ability to manage lifecycle of ceph primitives on the rook cluster.
package main
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
func main() {
fmt.Println("Starting")
client, err := getClient()
if err != nil {
panic(err)
}
c := &PVCController{
client: client,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
c.makePVCController()
stop := make(chan struct{})
defer close(stop)
go c.run(stop)
// Wait forever
select {}
}
var (
// controllerResyncPeriod potentially can be part of rook Operator configuration.
// in seconds
controllerResyncPeriod time.Duration = 120
// controllerRetry is number of retries to process an event
controllerRetry int = 5
)
// PVCController implements Kube controller that monitors PVC events.
// The controller ensures all kube namespaces have a set of credentials
// to manage lifeCycle of ceph primitives.
type PVCController struct {
client *rest.RESTClient
queue workqueue.RateLimitingInterface
indexer cache.Indexer
informer cache.Controller
}
func getClient() (*rest.RESTClient, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return rest.RESTClientFor(config)
}
func (c *PVCController) makePVCController() {
eventList := cache.NewListWatchFromClient(
c.client,
"persistentvolumeclaims",
v1.NamespaceAll,
fields.Everything(),
)
c.indexer, c.informer = cache.NewIndexerInformer(
eventList,
&v1.PersistentVolumeClaim{},
controllerResyncPeriod*time.Second,
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
},
cache.Indexers{},
)
}
func (c *PVCController) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.pvcHandler(key)
c.handleErr(err, key)
return true
}
func (c *PVCController) pvcHandler(key interface{}) error {
obj, exists, err := c.indexer.GetByKey(key.(string))
if err != nil {
return err
}
if exists {
fmt.Println("Processing pvc", obj.(*v1.PersistentVolumeClaim))
}
return err
}
func (c *PVCController) run(stopCh chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
fmt.Println("Starting PVC controller.")
go c.informer.Run(stopCh)
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
fmt.Println("Stopping PVC controller.")
}
func (c *PVCController) runWorker() {
for c.processNextItem() {
}
}
func (c *PVCController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
}
func (c *PVCController) onUpdate(obj, oldObj interface{}) {
fmt.Println("Update action", obj, oldObj)
}
func (c *PVCController) onDelete(obj interface{}) {
fmt.Println("Update action", obj)
}
func (c *PVCController) handleErr(err error, obj interface{}) {
if err == nil {
c.queue.Forget(obj)
return
}
if c.queue.NumRequeues(obj) < controllerRetry {
fmt.Println("Error processing pvc", obj, err)
c.queue.AddRateLimited(obj)
return
}
fmt.Println("Gave up on pvc event", obj, err)
c.queue.Forget(obj)
runtime.HandleError(err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment