Skip to content

Instantly share code, notes, and snippets.

@ncdc
Created March 30, 2020 19:50
Show Gist options
  • Save ncdc/8b825bcc0d5d27774833d2309c9bb088 to your computer and use it in GitHub Desktop.
Save ncdc/8b825bcc0d5d27774833d2309c9bb088 to your computer and use it in GitHub Desktop.
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"context"
"sync"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type ClusterCache interface {
Start()
Stop()
}
type clusterCache struct {
cache cache.Cache
stop chan struct{}
}
func (cc *clusterCache) Start() {
cc.cache.Start(cc.stop)
}
func (cc *clusterCache) Stop() {
close(cc.stop)
}
type ClusterCacheManager struct {
log logr.Logger
managementClusterClient client.Client
scheme runtime.Scheme
lock sync.RWMutex
clusterCaches map[client.ObjectKey]ClusterCache
// For testing
newClusterCache func(ctx context.Context, cluster client.ObjectKey) (ClusterCache, error)
}
type NewClusterCacheManagerInput struct {
Log logr.Logger
Manager ctrl.Manager
ManagementClusterClient client.Client
Scheme runtime.Scheme
ControllerOptions controller.Options
}
func NewClusterCacheManager(input NewClusterCacheManagerInput) (*ClusterCacheManager, error) {
m := &ClusterCacheManager{
log: input.Log,
managementClusterClient: input.ManagementClusterClient,
scheme: input.Scheme,
clusterCaches: make(map[client.ObjectKey]ClusterCache),
}
m.newClusterCache = m.defaultNewClusterCache
_, err := ctrl.NewControllerManagedBy(input.Manager).
For(&clusterv1.Cluster{}).
WithOptions(input.ControllerOptions).
Build(m)
if err != nil {
return nil, errors.Wrap(err, "failed to create cluster cache manager controller")
}
return m, nil
}
func (m *ClusterCacheManager) ClusterCache(ctx context.Context, cluster client.ObjectKey) (ClusterCache, error) {
cache := m.getClusterCache(cluster)
if cache != nil {
return cache, nil
}
return m.newClusterCache(ctx, cluster)
}
func (m *ClusterCacheManager) getClusterCache(cluster client.ObjectKey) ClusterCache {
m.lock.RLock()
defer m.lock.RUnlock()
return m.clusterCaches[cluster]
}
func (m *ClusterCacheManager) defaultNewClusterCache(ctx context.Context, cluster client.ObjectKey) (ClusterCache, error) {
m.lock.Lock()
defer m.lock.Unlock()
// If another goroutine created the cache while this one was waiting to acquire the write lock, return that
// instead of overwriting it.
c, exists := m.clusterCaches[cluster]
if exists {
return c, nil
}
config, err := RESTConfig(ctx, m.managementClusterClient, cluster)
if err != nil {
return nil, errors.Wrap(err, "error fetching remote cluster config")
}
remoteCache, err := cache.New(config, cache.Options{})
if err != nil {
return nil, errors.Wrap(err, "error creating cache for remote cluster")
}
stop := make(chan struct{})
c = &clusterCache{
cache: remoteCache,
stop: stop,
}
return c, nil
}
func (m *ClusterCacheManager) Reconcile(req reconcile.Request) (reconcile.Result, error) {
ctx := context.Background()
log := m.log.WithValues("namespace", req.Namespace, "name", req.Name)
log.V(4).Info("Reconciling")
var cluster clusterv1.Cluster
err := m.managementClusterClient.Get(ctx, req.NamespacedName, &cluster)
if err == nil {
log.V(4).Info("Cluster still exists")
return reconcile.Result{}, nil
}
log.V(4).Info("Error retrieving cluster", "error", err.Error())
c := m.getClusterCache(req.NamespacedName)
if c == nil {
log.V(4).Info("No current cluster cache exists - nothing to do")
}
log.V(4).Info("Stopping cluster cache")
c.Stop()
return reconcile.Result{}, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment