This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pkg/syncer/upsync/upsync_controller.go b/pkg/syncer/upsync/upsync_controller.go | |
index 6759192ce..aceb52f32 100644 | |
--- a/pkg/syncer/upsync/upsync_controller.go | |
+++ b/pkg/syncer/upsync/upsync_controller.go | |
@@ -27,7 +27,6 @@ import ( | |
"github.com/kcp-dev/logicalcluster/v3" | |
corev1 "k8s.io/api/core/v1" | |
- "k8s.io/apimachinery/pkg/api/equality" | |
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | |
"k8s.io/apimachinery/pkg/runtime/schema" | |
"k8s.io/apimachinery/pkg/types" | |
@@ -43,6 +42,9 @@ import ( | |
ddsif "github.com/kcp-dev/kcp/pkg/informer" | |
"github.com/kcp-dev/kcp/pkg/logging" | |
"github.com/kcp-dev/kcp/pkg/syncer/shared" | |
+ utilerrors "k8s.io/apimachinery/pkg/util/errors" | |
+ "k8s.io/apimachinery/pkg/api/errors" | |
+ "sync" | |
) | |
const controllerName = "kcp-resource-upsyncer" | |
@@ -109,7 +111,7 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster. | |
if new.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] != string(workloadv1alpha1.ResourceStateUpsync) { | |
return | |
} | |
- c.enqueue(gvr, obj, logger, false, new.UnstructuredContent()["status"] != nil) | |
+ c.enqueue(gvr, obj, logger) | |
}, | |
UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) { | |
if gvr == namespaceGVR { | |
@@ -128,11 +130,7 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster. | |
if new.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] != string(workloadv1alpha1.ResourceStateUpsync) { | |
return | |
} | |
- | |
- oldStatus := old.UnstructuredContent()["status"] | |
- newStatus := new.UnstructuredContent()["status"] | |
- isStatusUpdated := newStatus != nil && !equality.Semantic.DeepEqual(oldStatus, newStatus) | |
- c.enqueue(gvr, newObj, logger, false, isStatusUpdated) | |
+ c.enqueue(gvr, newObj, logger) | |
}, | |
DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) { | |
if gvr == namespaceGVR { | |
@@ -151,7 +149,7 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster. | |
} | |
// TODO(davidfestal): do we want to extract the namespace from where the resource was deleted, | |
// as done in the SpecController ? | |
- c.enqueue(gvr, obj, logger, false, false) | |
+ c.enqueue(gvr, unstr, logger) | |
}, | |
}) | |
@@ -168,6 +166,8 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluster. | |
type controller struct { | |
queue workqueue.RateLimitingInterface | |
+ dirtyStatusKeys sync.Map | |
+ | |
upstreamClient kcpdynamic.ClusterInterface | |
downstreamClient dynamic.Interface | |
@@ -182,24 +182,97 @@ type controller struct { | |
// Queue handles keys for both upstream and downstream resources. | |
type queueKey struct { | |
- gvr schema.GroupVersionResource | |
- key string | |
- // indicates whether it's an upstream key | |
- isUpstream bool | |
- includeStatus bool | |
+ gvr schema.GroupVersionResource | |
+ clusterName string | |
+ upstreamNamespace string | |
+ name string | |
} | |
-func (c *controller) enqueue(gvr schema.GroupVersionResource, obj interface{}, logger logr.Logger, isUpstream, includeStatus bool) { | |
- getKey := cache.DeletionHandlingMetaNamespaceKeyFunc | |
- if isUpstream { | |
- getKey = kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc | |
+func (c *controller) enqueue(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, logger logr.Logger) { | |
+ logger := logging.WithReconciler(klog.Background(), controllerName) | |
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) | |
+ if err != nil { | |
+ runtime.HandleError(err) | |
+ return | |
} | |
- key, err := getKey(obj) | |
+ ns, name, err := cache.SplitMetaNamespaceKey(key) | |
if err != nil { | |
runtime.HandleError(err) | |
return | |
} | |
- logging.WithQueueKey(logger, key).V(2).Info("queueing GVR", "gvr", gvr.String(), "isUpstream", isUpstream, "includeStatus", includeStatus) | |
+ | |
+ | |
+ ===== | |
+ | |
+ | |
+ downstreamNamespace := obj.GetNamespace() | |
+ | |
+ var locatorHolder metav1.Object | |
+ if downstreamNamespace != "" { | |
+ downstreamNamespaceLister, err := c.getDownstreamLister(corev1.SchemeGroupVersion.WithResource("namespaces")) | |
+ if err != nil { | |
+ return err | |
+ } | |
+ nsObj, err := downstreamNamespaceLister.Get(downstreamNamespace) | |
+ if k8serror.IsNotFound(err) { | |
+ // Since the namespace is already deleted downstream, so that we can't get the namespace locator, | |
+ // we won't be able to delete the resource upstream. | |
+ // | |
+ logger.Error(err, "the downstream namespace doesn't exist anymore.") | |
+ return nil | |
+ } | |
+ if err != nil { | |
+ logger.Error(err, "error getting downstream Namespace") | |
+ return err | |
+ } | |
+ if nsMetav1, ok := nsObj.(metav1.Object); !ok { | |
+ return fmt.Errorf("downstream ns expected to be metav1.Object got %T", nsObj) | |
+ } else { | |
+ locatorHolder = nsMetav1 | |
+ } | |
+ } | |
+ | |
+ // retrieve downstream object | |
+ var downstreamObject runtime.Object | |
+ if downstreamNamespace != "" { | |
+ downstreamObject, err = downstreamLister.ByNamespace(downstreamNamespace).Get(downstreamName) | |
+ } else { | |
+ downstreamObject, err = downstreamLister.Get(downstreamName) | |
+ } | |
+ if err != nil && !k8serror.IsNotFound(err) { | |
+ return err | |
+ } | |
+ | |
+ var downstreamResource *unstructured.Unstructured | |
+ if !k8serror.IsNotFound(err) { | |
+ var ok bool | |
+ downstreamResource, ok = downstreamObject.(*unstructured.Unstructured) | |
+ if !ok { | |
+ return fmt.Errorf("type mismatch of resource object: received %T", downstreamResource) | |
+ } | |
+ if downstreamNamespace == "" { | |
+ locatorHolder = downstreamResource | |
+ } | |
+ } | |
+ | |
+ var upstreamLocator *shared.NamespaceLocator | |
+ if locatorHolder != nil { | |
+ if locator, locatorExists, err := shared.LocatorFromAnnotations(locatorHolder.GetAnnotations()); err != nil { | |
+ return err | |
+ } else if locatorExists && locator != nil { | |
+ upstreamLocator = locator | |
+ } | |
+ } | |
+ | |
+ if upstreamLocator == nil { | |
+ logger.Error(err, "locator not found in the downstream resource: related upstream resource cannot be updated") | |
+ return nil | |
+ } | |
+ | |
+ | |
+ ==== | |
+ | |
+ logging.With(logger, obj).V(2).Info("queueing GVR", "gvr", gvr.String()) | |
c.queue.Add( | |
queueKey{ | |
gvr: gvr, | |
@@ -231,23 +304,59 @@ func (c *controller) startWorker(ctx context.Context) { | |
} | |
func (c *controller) processNextWorkItem(ctx context.Context) bool { | |
- key, quit := c.queue.Get() | |
+ // Wait until there is a new item in the working queue | |
+ k, quit := c.queue.Get() | |
if quit { | |
return false | |
} | |
- qk := key.(queueKey) | |
- logger := logging.WithQueueKey(klog.FromContext(ctx), qk.key).WithValues("gvr", qk.gvr.String(), "isUpstream", qk.isUpstream, "includeStatus", qk.includeStatus) | |
+ key := k.(string) | |
+ | |
+ logger := logging.WithQueueKey(klog.FromContext(ctx), key) | |
ctx = klog.NewContext(ctx, logger) | |
- logger.V(1).Info("Processing key") | |
+ logger.V(1).Info("processing key") | |
+ | |
+ // No matter what, tell the queue we're done with this key, to unblock | |
+ // other workers. | |
defer c.queue.Done(key) | |
- if err := c.process(ctx, qk.gvr, qk.key, qk.isUpstream, qk.includeStatus); err != nil { | |
- runtime.HandleError(fmt.Errorf("%s failed to upsync %q, err: %w", controllerName, key, err)) | |
+ if requeue, err := c.process(ctx, key); err != nil { | |
+ runtime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", controllerName, key, err)) | |
c.queue.AddRateLimited(key) | |
return true | |
+ } else if requeue { | |
+ // only requeue if we didn't error, but we still want to requeue | |
+ c.queue.Add(key) | |
+ return true | |
} | |
- | |
c.queue.Forget(key) | |
- | |
return true | |
} | |
+ | |
+func (c *controller) process(ctx context.Context, key string) (bool, error) { | |
+ parent, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key) | |
+ if err != nil { | |
+ runtime.HandleError(err) | |
+ return false, nil | |
+ } | |
+ | |
+ dirtyStatus, found := c.dirtyStatusKeys.LoadAndDelete(k) | |
+ if !found { | |
+ dirtyStatus = false | |
+ } | |
+ | |
+ upstreamObj, err := c.ddsif....Cluster(parent).Get(name) | |
+ if err != nil && !errors.IsNotFound(err) { | |
+ return false, err | |
+ } | |
+ | |
+ logger := logging.WithObject(klog.FromContext(ctx), upstreamObj) | |
+ ctx = klog.NewContext(ctx, logger) | |
+ | |
+ var errs []error | |
+ requeue, err := c.reconcile(ctx, gvr, clusterName, ns, name, upstreamObj) | |
+ if err != nil { | |
+ errs = append(errs, err) | |
+ } | |
+ | |
+ return requeue, utilerrors.NewAggregate(errs) | |
+} | |
diff --git a/pkg/syncer/upsync/upsync_process.go b/pkg/syncer/upsync/upsync_process.go | |
index 27a82ffa4..5a6d8deca 100644 | |
--- a/pkg/syncer/upsync/upsync_process.go | |
+++ b/pkg/syncer/upsync/upsync_process.go | |
@@ -44,7 +44,7 @@ const ( | |
// when it was last upsynced. | |
// It is used to check easily, without having to compare the resource contents, | |
// whether an upsynced upstream resource is up-to-date with the downstream resource. | |
- ResourceVersionAnnotation = "kcp.io/resource-version" | |
+ ResourceVersionAnnotation = "workload.kcp.io/rv" | |
) | |
func (c *controller) processUpstreamResource(ctx context.Context, gvr schema.GroupVersionResource, key string) error { | |
@@ -109,8 +109,9 @@ func (c *controller) processUpstreamResource(ctx context.Context, gvr schema.Gro | |
return err | |
} | |
-func (c *controller) processDownstreamResource(ctx context.Context, gvr schema.GroupVersionResource, key string, includeStatus bool) error { | |
+func (c *controller) reconcile(ctx context.Context, gvr schema.GroupVersionResource, clusterName, ns, name string, obj *unstructured.Unstructured) error { | |
logger := klog.FromContext(ctx) | |
+ | |
downstreamNamespace, downstreamName, err := cache.SplitMetaNamespaceKey(key) | |
if err != nil { | |
logger.Error(err, "Invalid key") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment