Created
August 2, 2017 23:23
-
-
Save paha/d33847efefd06c4420ea00bd8678fa6e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Copyright 2016 The Rook Authors. All rights reserved. | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
Some of the code below came from https://github.com/coreos/etcd-operator | |
which also has the apache 2.0 license. | |
*/ | |
// Kube controller watching PVC events. | |
// * Provides ceph user credentials, ensuring each namespace that uses Rook, | |
// has ability to manage lifecycle of ceph primitives on the rook cluster. | |
package main | |
import ( | |
"fmt" | |
"time" | |
"k8s.io/apimachinery/pkg/fields" | |
"k8s.io/apimachinery/pkg/util/runtime" | |
"k8s.io/apimachinery/pkg/util/wait" | |
"k8s.io/api/core/v1" | |
"k8s.io/client-go/rest" | |
"k8s.io/client-go/tools/cache" | |
"k8s.io/client-go/util/workqueue" | |
) | |
func main() { | |
fmt.Println("Starting") | |
client, err := getClient() | |
if err != nil { | |
panic(err) | |
} | |
c := &PVCController{ | |
client: client, | |
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), | |
} | |
c.makePVCController() | |
stop := make(chan struct{}) | |
defer close(stop) | |
go c.run(stop) | |
// Wait forever | |
select {} | |
} | |
var ( | |
// controllerResyncPeriod potentially can be part of rook Operator configuration. | |
// in seconds | |
controllerResyncPeriod time.Duration = 120 | |
// controllerRetry is number of retries to process an event | |
controllerRetry int = 5 | |
) | |
// PVCController implements Kube controller that monitors PVC events. | |
// The controller ensures all kube namespaces have a set of credentials | |
// to manage lifeCycle of ceph primitives. | |
type PVCController struct { | |
client *rest.RESTClient | |
queue workqueue.RateLimitingInterface | |
indexer cache.Indexer | |
informer cache.Controller | |
} | |
func getClient() (*rest.RESTClient, error) { | |
config, err := rest.InClusterConfig() | |
if err != nil { | |
return nil, err | |
} | |
return rest.RESTClientFor(config) | |
} | |
func (c *PVCController) makePVCController() { | |
eventList := cache.NewListWatchFromClient( | |
c.client, | |
"persistentvolumeclaims", | |
v1.NamespaceAll, | |
fields.Everything(), | |
) | |
c.indexer, c.informer = cache.NewIndexerInformer( | |
eventList, | |
&v1.PersistentVolumeClaim{}, | |
controllerResyncPeriod*time.Second, | |
cache.ResourceEventHandlerFuncs{ | |
AddFunc: c.onAdd, | |
UpdateFunc: c.onUpdate, | |
DeleteFunc: c.onDelete, | |
}, | |
cache.Indexers{}, | |
) | |
} | |
func (c *PVCController) processNextItem() bool { | |
key, quit := c.queue.Get() | |
if quit { | |
return false | |
} | |
defer c.queue.Done(key) | |
err := c.pvcHandler(key) | |
c.handleErr(err, key) | |
return true | |
} | |
func (c *PVCController) pvcHandler(key interface{}) error { | |
obj, exists, err := c.indexer.GetByKey(key.(string)) | |
if err != nil { | |
return err | |
} | |
if exists { | |
fmt.Println("Processing pvc", obj.(*v1.PersistentVolumeClaim)) | |
} | |
return err | |
} | |
func (c *PVCController) run(stopCh chan struct{}) { | |
defer runtime.HandleCrash() | |
defer c.queue.ShutDown() | |
fmt.Println("Starting PVC controller.") | |
go c.informer.Run(stopCh) | |
// Wait for all involved caches to be synced, before processing items from the queue is started | |
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { | |
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) | |
return | |
} | |
go wait.Until(c.runWorker, time.Second, stopCh) | |
<-stopCh | |
fmt.Println("Stopping PVC controller.") | |
} | |
func (c *PVCController) runWorker() { | |
for c.processNextItem() { | |
} | |
} | |
func (c *PVCController) onAdd(obj interface{}) { | |
key, err := cache.MetaNamespaceKeyFunc(obj) | |
if err == nil { | |
c.queue.Add(key) | |
} | |
} | |
func (c *PVCController) onUpdate(obj, oldObj interface{}) { | |
fmt.Println("Update action", obj, oldObj) | |
} | |
func (c *PVCController) onDelete(obj interface{}) { | |
fmt.Println("Update action", obj) | |
} | |
func (c *PVCController) handleErr(err error, obj interface{}) { | |
if err == nil { | |
c.queue.Forget(obj) | |
return | |
} | |
if c.queue.NumRequeues(obj) < controllerRetry { | |
fmt.Println("Error processing pvc", obj, err) | |
c.queue.AddRateLimited(obj) | |
return | |
} | |
fmt.Println("Gave up on pvc event", obj, err) | |
c.queue.Forget(obj) | |
runtime.HandleError(err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment