Skip to content

Instantly share code, notes, and snippets.

@fabianvf
Created August 14, 2019 20:47
Show Gist options
  • Save fabianvf/7cb795603a7559a12a3c81eeee3ba4bb to your computer and use it in GitHub Desktop.
Save fabianvf/7cb795603a7559a12a3c81eeee3ba4bb to your computer and use it in GitHub Desktop.
diff --git a/pkg/ansible/controller/controller.go b/pkg/ansible/controller/controller.go
index 02accab2..28bdeb92 100644
--- a/pkg/ansible/controller/controller.go
+++ b/pkg/ansible/controller/controller.go
@@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -57,9 +58,20 @@ func Add(mgr manager.Manager, options Options) *controller.Controller {
options.EventHandlers = []events.EventHandler{}
}
eventHandlers := append(options.EventHandlers, events.NewLoggingEventHandler(options.LoggingLevel))
+ apiReader, err := client.New(mgr.GetConfig(), client.Options{})
+ if err != nil {
+ log.Error(err, "Unable to get new api client")
+ }
aor := &AnsibleOperatorReconciler{
- Client: mgr.GetClient(),
+ // The default client will use the DelegatingReader for reads
+ // this forces it to use the cache for unstructured types.
+ Client: client.DelegatingClient{
+ Reader: mgr.GetCache(),
+ Writer: mgr.GetClient(),
+ StatusClient: mgr.GetClient(),
+ },
+ APIReader: apiReader,
GVK: options.GVK,
Runner: options.Runner,
EventHandlers: eventHandlers,
@@ -68,7 +80,7 @@ func Add(mgr manager.Manager, options Options) *controller.Controller {
}
scheme := mgr.GetScheme()
- _, err := scheme.New(options.GVK)
+ _, err = scheme.New(options.GVK)
if runtime.IsNotRegisteredError(err) {
// Register the GVK with the schema
scheme.AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
diff --git a/pkg/ansible/controller/reconcile.go b/pkg/ansible/controller/reconcile.go
index 1a26424a..81688d81 100644
--- a/pkg/ansible/controller/reconcile.go
+++ b/pkg/ansible/controller/reconcile.go
@@ -55,6 +55,7 @@ type AnsibleOperatorReconciler struct {
GVK schema.GroupVersionKind
Runner runner.Runner
Client client.Client
+ APIReader client.Reader
EventHandlers []events.EventHandler
ReconcilePeriod time.Duration
ManageStatus bool
@@ -220,19 +221,16 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
}
if r.ManageStatus {
err = r.markDone(u, request.NamespacedName, statusEvent, failureMessages)
- if err != nil {
- logger.Error(err, "Failed to mark status done")
+ if exit, err := determineReturn(err); exit {
+ return reconcileResult, err
}
+
}
return reconcileResult, err
}
func (r *AnsibleOperatorReconciler) markRunning(u *unstructured.Unstructured, namespacedName types.NamespacedName) error {
// Get the latest resource to prevent updating a stale status
- err := r.Client.Get(context.TODO(), namespacedName, u)
- if err != nil {
- return err
- }
statusInterface := u.Object["status"]
statusMap, _ := statusInterface.(map[string]interface{})
crStatus := ansiblestatus.CreateFromMap(statusMap)
@@ -256,7 +254,7 @@ func (r *AnsibleOperatorReconciler) markRunning(u *unstructured.Unstructured, na
)
ansiblestatus.SetCondition(&crStatus, *c)
u.Object["status"] = crStatus.GetJSONMap()
- err = r.Client.Status().Update(context.TODO(), u)
+ err := r.Client.Status().Update(context.TODO(), u)
if err != nil {
return err
}
@@ -306,16 +304,6 @@ func (r *AnsibleOperatorReconciler) markError(u *unstructured.Unstructured, name
}
func (r *AnsibleOperatorReconciler) markDone(u *unstructured.Unstructured, namespacedName types.NamespacedName, statusEvent eventapi.StatusJobEvent, failureMessages eventapi.FailureMessages) error {
- logger := logf.Log.WithName("markDone")
- // Get the latest resource to prevent updating a stale status
- err := r.Client.Get(context.TODO(), namespacedName, u)
- if apierrors.IsNotFound(err) {
- logger.Info("Resource not found, assuming it was deleted", err)
- return nil
- }
- if err != nil {
- return err
- }
statusInterface := u.Object["status"]
statusMap, _ := statusInterface.(map[string]interface{})
crStatus := ansiblestatus.CreateFromMap(statusMap)
@@ -363,3 +351,21 @@ func contains(l []string, s string) bool {
}
return false
}
+
+// determineReturn - if the object was updated outside of our controller
+// this means that the current reconcilation is over and we should use the
+// latest version. To do this, we just exit without error because the
+// latest version should be queued for update.
+func determineReturn(err error) (bool, error) {
+ exit := false
+ if err == nil {
+ return exit, err
+ }
+ exit = true
+
+ if apierrors.IsConflict(err) {
+ log.V(1).Info("Conflict found during an update; re-running reconcilation")
+ return exit, nil
+ }
+ return exit, err
+}
diff --git a/pkg/ansible/controller/reconcile_test.go b/pkg/ansible/controller/reconcile_test.go
index 2218b579..e0130241 100644
--- a/pkg/ansible/controller/reconcile_test.go
+++ b/pkg/ansible/controller/reconcile_test.go
@@ -487,6 +487,7 @@ func TestReconcile(t *testing.T) {
GVK: tc.GVK,
Runner: tc.Runner,
Client: tc.Client,
+ APIReader: tc.Client,
EventHandlers: tc.EventHandlers,
ReconcilePeriod: tc.ReconcilePeriod,
ManageStatus: tc.ManageStatus,
diff --git a/pkg/ansible/proxy/proxy_test.go b/pkg/ansible/proxy/proxy_test.go
index 65eab30d..3991cadb 100644
--- a/pkg/ansible/proxy/proxy_test.go
+++ b/pkg/ansible/proxy/proxy_test.go
@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// +build !openshiftci
+
package proxy
import (
diff --git a/pkg/ansible/run.go b/pkg/ansible/run.go
index 7b9bd53d..ecc4f719 100644
--- a/pkg/ansible/run.go
+++ b/pkg/ansible/run.go
@@ -60,6 +60,8 @@ func printVersion() {
func Run(flags *aoflags.AnsibleOperatorFlags) error {
printVersion()
+ printVersion()
+
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
diff --git a/pkg/ansible/runner/internal/inputdir/inputdir.go b/pkg/ansible/runner/internal/inputdir/inputdir.go
index cca03b50..3a58c892 100644
--- a/pkg/ansible/runner/internal/inputdir/inputdir.go
+++ b/pkg/ansible/runner/internal/inputdir/inputdir.go
@@ -22,9 +22,10 @@ import (
"path/filepath"
"strings"
- "github.com/operator-framework/operator-sdk/internal/util/fileutil"
"github.com/spf13/afero"
+ "github.com/operator-framework/operator-sdk/internal/util/fileutil"
+
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)
diff --git a/pkg/helm/controller/controller.go b/pkg/helm/controller/controller.go
index 3d57b76c..093904a0 100644
--- a/pkg/helm/controller/controller.go
+++ b/pkg/helm/controller/controller.go
@@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
rpb "k8s.io/helm/pkg/proto/hapi/release"
+ "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
@@ -56,7 +57,13 @@ type WatchOptions struct {
// Add creates a new helm operator controller and adds it to the manager
func Add(mgr manager.Manager, options WatchOptions) error {
r := &HelmOperatorReconciler{
- Client: mgr.GetClient(),
+ // The default client will use the DelegatingReader for reads
+ // this forces it to use the cache for unstructured types.
+ Client: client.DelegatingClient{
+ Reader: mgr.GetCache(),
+ Writer: mgr.GetClient(),
+ StatusClient: mgr.GetClient(),
+ },
GVK: options.GVK,
ManagerFactory: options.ManagerFactory,
ReconcilePeriod: options.ReconcilePeriod,
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment