Skip to content

Instantly share code, notes, and snippets.

@pintohutch
Created November 17, 2022 01:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pintohutch/738bcd04dbcae625e248eaf832e45880 to your computer and use it in GitHub Desktop.
Save pintohutch/738bcd04dbcae625e248eaf832e45880 to your computer and use it in GitHub Desktop.
diff --git a/pkg/operator/target_status.go b/pkg/operator/target_status.go
index 1a83256e4..c01d92fcf 100644
--- a/pkg/operator/target_status.go
+++ b/pkg/operator/target_status.go
@@ -34,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -59,62 +60,23 @@ var (
}, []string{})
)
-type TimerBuilder interface {
- Create() Timer
-}
-
-// Wraps a time.Timer so we can replace for unit testing.
-type Timer interface {
- // Returns the channel receiving ticks.
- Channel() <-chan time.Time
-}
-
-func NewTimerBuilder(d time.Duration) TimerBuilder {
- return &timeTimerBuilder{
- d: d,
- }
-}
-
-type timeTimerBuilder struct {
- d time.Duration
-}
-
-func (f *timeTimerBuilder) Create() Timer {
- return NewTimer(f.d)
-}
-
-type timeTimer time.Timer
-
-func NewTimer(d time.Duration) Timer {
- data := timeTimer(*time.NewTimer(d))
- return &data
-}
-
-func (t *timeTimer) Channel() <-chan time.Time {
- timer := time.Timer(*t)
- return timer.C
-}
-
// Responsible for fetching the targets given a pod.
type getTargetFn func(ctx context.Context, logger logr.Logger, port int32, pod *corev1.Pod) (*prometheusv1.TargetsResult, error)
// targetStatusReconciler to hold cached client state and source channel.
type targetStatusReconciler struct {
- ch chan<- event.GenericEvent
- opts Options
- getTarget getTargetFn
- timerBuilder TimerBuilder
- logger logr.Logger
- kubeClient client.Client
+ ch chan<- event.GenericEvent
+ opts Options
+ getTarget getTargetFn
+ clock clock.Clock
+ logger logr.Logger
+ kubeClient client.Client
}
// newTargetStatusReconciler creates a new targetStatusReconciler and kicks off
// the reconcile loop by sending the first event to the source channel.
-func setupTargetStatusPoller(o *Operator, registry prometheus.Registerer) error {
- return setupTargetStatusReconciler(o.logger, o.opts, registry, getTarget, o.manager, NewTimerBuilder(10*time.Second))
-}
-
-func setupTargetStatusReconciler(logger logr.Logger, opts Options, registry prometheus.Registerer, getTarget getTargetFn, kubeManager manager.Manager, timerBuilder TimerBuilder) error {
+func setupTargetStatusPoller(op *Operator, registry prometheus.Registerer) error {
+ //o.logger, o.opts, registry, getTarget, o.manager, NewTimerBuilder(10*time.Second))
if err := registry.Register(targetStatusDuration); err != nil {
return err
}
@@ -122,15 +84,15 @@ func setupTargetStatusReconciler(logger logr.Logger, opts Options, registry prom
ch := make(chan event.GenericEvent, 1)
reconciler := &targetStatusReconciler{
- ch: ch,
- opts: opts,
- getTarget: getTarget,
- logger: logger,
- kubeClient: kubeManager.GetClient(),
- timerBuilder: timerBuilder,
+ ch: ch,
+ opts: op.opts,
+ getTarget: getTarget,
+ logger: op.logger,
+ kubeClient: op.manager.GetClient(),
+ clock: clock.RealClock{},
}
- err := ctrl.NewControllerManagedBy(kubeManager).
+ err := ctrl.NewControllerManagedBy(op.manager).
Named("target-status").
// This is a required field but we only watch to the channel.
For(
@@ -149,7 +111,7 @@ func setupTargetStatusReconciler(logger logr.Logger, opts Options, registry prom
}
// Start the controller only once.
- kubeManager.Add(manager.RunnableFunc(func(ctx context.Context) error {
+ op.manager.Add(manager.RunnableFunc(func(ctx context.Context) error {
reconciler.ch <- event.GenericEvent{
Object: &appsv1.DaemonSet{},
}
@@ -162,7 +124,7 @@ func setupTargetStatusReconciler(logger logr.Logger, opts Options, registry prom
// Reconcile polls the collector pods, fetches and aggregates target status and
// upserts into each PodMonitoring's Status field.
func (r *targetStatusReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
- timer := r.timerBuilder.Create()
+ timer := r.clock.NewTimer(10 * time.Second)
now := time.Now()
if err := poll(ctx, r.logger, r.opts, r.getTarget, r.kubeClient); err != nil {
@@ -177,7 +139,7 @@ func (r *targetStatusReconciler) Reconcile(ctx context.Context, request reconcil
select {
case <-ctx.Done():
break
- case <-timer.Channel():
+ case <-timer.C():
r.ch <- event.GenericEvent{
Object: &appsv1.DaemonSet{},
}
diff --git a/pkg/operator/target_status_test.go b/pkg/operator/target_status_test.go
index c55c2e6d4..5332ada94 100644
--- a/pkg/operator/target_status_test.go
+++ b/pkg/operator/target_status_test.go
@@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
+ tclock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -1120,39 +1121,6 @@ func TestTargetStatusConversion(t *testing.T) {
}
}
-type testTimerBuilder struct {
- timer *testTimer
-}
-
-func (b *testTimerBuilder) Create() Timer {
- return b.timer
-}
-
-func newTestTimerBuilder(timer *testTimer) TimerBuilder {
- return &testTimerBuilder{
- timer: timer,
- }
-}
-
-type testTimer struct {
- c chan time.Time
-}
-
-func newTestTimer() *testTimer {
- return &testTimer{
- // Expect a single tick at a time.
- c: make(chan time.Time, 0),
- }
-}
-
-func (t *testTimer) Channel() <-chan time.Time {
- return t.c
-}
-
-func (t *testTimer) tick() {
- t.c <- time.Time{}
-}
-
func getPodKey(pod *corev1.Pod, port int32) string {
return fmt.Sprintf("%s:%d", pod.Status.PodIP, port)
}
@@ -1178,7 +1146,7 @@ func TestPolling(t *testing.T) {
t.Fatal("Invalid options:", err)
}
- timer := newTestTimer()
+ fakeClock := tclock.NewFakeClock(time.Now())
scheme, err := getScheme()
if err != nil {
@@ -1256,12 +1224,12 @@ func TestPolling(t *testing.T) {
ch := make(chan event.GenericEvent, 1)
reconciler := &targetStatusReconciler{
- ch: ch,
- opts: opts,
- getTarget: targetFetcher.getTarget,
- logger: logger,
- kubeClient: kubeClient,
- timerBuilder: newTestTimerBuilder(timer),
+ ch: ch,
+ opts: opts,
+ getTarget: targetFetcher.getTarget,
+ logger: logger,
+ kubeClient: kubeClient,
+ clock: fakeClock,
}
expectStatus := func(t *testing.T, description string, expected []monitoringv1.ScrapeEndpointStatus) {
@@ -1313,7 +1281,7 @@ func TestPolling(t *testing.T) {
}()
// First tick.
- timer.tick()
+ fakeClock.Step(10 * time.Second)
statusTick1 := []v1.ScrapeEndpointStatus{
{
Name: "PodMonitoring/gmp-test/prom-example-1/metrics",
@@ -1347,7 +1315,7 @@ func TestPolling(t *testing.T) {
expectStatus(t, "first wait", statusTick1)
// Second tick.
- timer.tick()
+ fakeClock.Step(10 * time.Second)
statusTick2 := []v1.ScrapeEndpointStatus{
{
Name: "PodMonitoring/gmp-test/prom-example-1/metrics",
@@ -1380,7 +1348,7 @@ func TestPolling(t *testing.T) {
// We didn't tick yet so we don't expect a change yet.
expectStatus(t, "second wait", statusTick2)
- timer.tick()
+ fakeClock.Step(10 * time.Second)
statusTick3 := []v1.ScrapeEndpointStatus{
{
Name: "PodMonitoring/gmp-test/prom-example-1/metrics",
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment