Skip to content

Instantly share code, notes, and snippets.

@sttts
Created February 13, 2023 11:05
Show Gist options
  • Save sttts/22985e820634657332335ac1c759eb5a to your computer and use it in GitHub Desktop.
Save sttts/22985e820634657332335ac1c759eb5a to your computer and use it in GitHub Desktop.
diff --git a/pkg/syncer/upsync/upsync_controller.go b/pkg/syncer/upsync/upsync_controller.go
index 6759192ce..aceb52f32 100644
--- a/pkg/syncer/upsync/upsync_controller.go
+++ b/pkg/syncer/upsync/upsync_controller.go
@@ -27,7 +27,6 @@ import (
"github.com/kcp-dev/logicalcluster/v3"
corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@@ -43,6 +42,9 @@ import (
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
+ utilerrors "k8s.io/apimachinery/pkg/util/errors"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "sync"
)
const controllerName = "kcp-resource-upsyncer"
@@ -109,7 +111,7 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster.
if new.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] != string(workloadv1alpha1.ResourceStateUpsync) {
return
}
- c.enqueue(gvr, obj, logger, false, new.UnstructuredContent()["status"] != nil)
+ c.enqueue(gvr, obj, logger)
},
UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) {
if gvr == namespaceGVR {
@@ -128,11 +130,7 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster.
if new.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] != string(workloadv1alpha1.ResourceStateUpsync) {
return
}
-
- oldStatus := old.UnstructuredContent()["status"]
- newStatus := new.UnstructuredContent()["status"]
- isStatusUpdated := newStatus != nil && !equality.Semantic.DeepEqual(oldStatus, newStatus)
- c.enqueue(gvr, newObj, logger, false, isStatusUpdated)
+ c.enqueue(gvr, newObj, logger)
},
DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
if gvr == namespaceGVR {
@@ -151,7 +149,7 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster.
}
// TODO(davidfestal): do we want to extract the namespace from where the resource was deleted,
// as done in the SpecController ?
- c.enqueue(gvr, obj, logger, false, false)
+ c.enqueue(gvr, unstr, logger)
},
})
@@ -168,6 +166,8 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster.
type controller struct {
queue workqueue.RateLimitingInterface
+ dirtyStatusKeys sync.Map
+
upstreamClient kcpdynamic.ClusterInterface
downstreamClient dynamic.Interface
@@ -182,24 +182,97 @@ type controller struct {
// Queue handles keys for both upstream and downstream resources.
type queueKey struct {
- gvr schema.GroupVersionResource
- key string
- // indicates whether it's an upstream key
- isUpstream bool
- includeStatus bool
+ gvr schema.GroupVersionResource
+ clusterName string
+ upstreamNamespace string
+ name string
}
-func (c *controller) enqueue(gvr schema.GroupVersionResource, obj interface{}, logger logr.Logger, isUpstream, includeStatus bool) {
- getKey := cache.DeletionHandlingMetaNamespaceKeyFunc
- if isUpstream {
- getKey = kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc
+func (c *controller) enqueue(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, logger logr.Logger) {
+ logger := logging.WithReconciler(klog.Background(), controllerName)
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+ if err != nil {
+ runtime.HandleError(err)
+ return
}
- key, err := getKey(obj)
+ ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return
}
- logging.WithQueueKey(logger, key).V(2).Info("queueing GVR", "gvr", gvr.String(), "isUpstream", isUpstream, "includeStatus", includeStatus)
+
+
+ =====
+
+
+ downstreamNamespace := obj.GetNamespace()
+
+ var locatorHolder metav1.Object
+ if downstreamNamespace != "" {
+ downstreamNamespaceLister, err := c.getDownstreamLister(corev1.SchemeGroupVersion.WithResource("namespaces"))
+ if err != nil {
+ return err
+ }
+ nsObj, err := downstreamNamespaceLister.Get(downstreamNamespace)
+ if k8serror.IsNotFound(err) {
+ // Since the namespace is already deleted downstream, so that we can't get the namespace locator,
+ // we won't be able to delete the resource upstream.
+ //
+ logger.Error(err, "the downstream namespace doesn't exist anymore.")
+ return nil
+ }
+ if err != nil {
+ logger.Error(err, "error getting downstream Namespace")
+ return err
+ }
+ if nsMetav1, ok := nsObj.(metav1.Object); !ok {
+ return fmt.Errorf("downstream ns expected to be metav1.Object got %T", nsObj)
+ } else {
+ locatorHolder = nsMetav1
+ }
+ }
+
+ // retrieve downstream object
+ var downstreamObject runtime.Object
+ if downstreamNamespace != "" {
+ downstreamObject, err = downstreamLister.ByNamespace(downstreamNamespace).Get(downstreamName)
+ } else {
+ downstreamObject, err = downstreamLister.Get(downstreamName)
+ }
+ if err != nil && !k8serror.IsNotFound(err) {
+ return err
+ }
+
+ var downstreamResource *unstructured.Unstructured
+ if !k8serror.IsNotFound(err) {
+ var ok bool
+ downstreamResource, ok = downstreamObject.(*unstructured.Unstructured)
+ if !ok {
+ return fmt.Errorf("type mismatch of resource object: received %T", downstreamResource)
+ }
+ if downstreamNamespace == "" {
+ locatorHolder = downstreamResource
+ }
+ }
+
+ var upstreamLocator *shared.NamespaceLocator
+ if locatorHolder != nil {
+ if locator, locatorExists, err := shared.LocatorFromAnnotations(locatorHolder.GetAnnotations()); err != nil {
+ return err
+ } else if locatorExists && locator != nil {
+ upstreamLocator = locator
+ }
+ }
+
+ if upstreamLocator == nil {
+ logger.Error(err, "locator not found in the downstream resource: related upstream resource cannot be updated")
+ return nil
+ }
+
+
+ ====
+
+ logging.With(logger, obj).V(2).Info("queueing GVR", "gvr", gvr.String())
c.queue.Add(
queueKey{
gvr: gvr,
@@ -231,23 +304,59 @@ func (c *controller) startWorker(ctx context.Context) {
}
func (c *controller) processNextWorkItem(ctx context.Context) bool {
- key, quit := c.queue.Get()
+ // Wait until there is a new item in the working queue
+ k, quit := c.queue.Get()
if quit {
return false
}
- qk := key.(queueKey)
- logger := logging.WithQueueKey(klog.FromContext(ctx), qk.key).WithValues("gvr", qk.gvr.String(), "isUpstream", qk.isUpstream, "includeStatus", qk.includeStatus)
+ key := k.(string)
+
+ logger := logging.WithQueueKey(klog.FromContext(ctx), key)
ctx = klog.NewContext(ctx, logger)
- logger.V(1).Info("Processing key")
+ logger.V(1).Info("processing key")
+
+ // No matter what, tell the queue we're done with this key, to unblock
+ // other workers.
defer c.queue.Done(key)
- if err := c.process(ctx, qk.gvr, qk.key, qk.isUpstream, qk.includeStatus); err != nil {
- runtime.HandleError(fmt.Errorf("%s failed to upsync %q, err: %w", controllerName, key, err))
+ if requeue, err := c.process(ctx, key); err != nil {
+ runtime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", controllerName, key, err))
c.queue.AddRateLimited(key)
return true
+ } else if requeue {
+ // only requeue if we didn't error, but we still want to requeue
+ c.queue.Add(key)
+ return true
}
-
c.queue.Forget(key)
-
return true
}
+
+func (c *controller) process(ctx context.Context, key string) (bool, error) {
+ parent, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
+ if err != nil {
+ runtime.HandleError(err)
+ return false, nil
+ }
+
+ dirtyStatus, found := c.dirtyStatusKeys.LoadAndDelete(k)
+ if !found {
+ dirtyStatus = false
+ }
+
+ upstreamObj, err := c.ddsif....Cluster(parent).Get(name)
+ if err != nil && !errors.IsNotFound(err) {
+ return false, err
+ }
+
+ logger := logging.WithObject(klog.FromContext(ctx), upstreamObj)
+ ctx = klog.NewContext(ctx, logger)
+
+ var errs []error
+ requeue, err := c.reconcile(ctx, gvr, clusterName, ns, name, upstreamObj)
+ if err != nil {
+ errs = append(errs, err)
+ }
+
+ return requeue, utilerrors.NewAggregate(errs)
+}
diff --git a/pkg/syncer/upsync/upsync_process.go b/pkg/syncer/upsync/upsync_process.go
index 27a82ffa4..5a6d8deca 100644
--- a/pkg/syncer/upsync/upsync_process.go
+++ b/pkg/syncer/upsync/upsync_process.go
@@ -44,7 +44,7 @@ const (
// when it was last upsynced.
// It is used to check easily, without having to compare the resource contents,
// whether an upsynced upstream resource is up-to-date with the downstream resource.
- ResourceVersionAnnotation = "kcp.io/resource-version"
+ ResourceVersionAnnotation = "workload.kcp.io/rv"
)
func (c *controller) processUpstreamResource(ctx context.Context, gvr schema.GroupVersionResource, key string) error {
@@ -109,8 +109,9 @@ func (c *controller) processUpstreamResource(ctx context.Context, gvr schema.Gro
return err
}
-func (c *controller) processDownstreamResource(ctx context.Context, gvr schema.GroupVersionResource, key string, includeStatus bool) error {
+func (c *controller) reconcile(ctx context.Context, gvr schema.GroupVersionResource, clusterName, ns, name string, obj *unstructured.Unstructured) error {
logger := klog.FromContext(ctx)
+
downstreamNamespace, downstreamName, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
logger.Error(err, "Invalid key")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment