Skip to content

Instantly share code, notes, and snippets.

@ShubhamRwt
Created January 30, 2024 11:56
Show Gist options
  • Save ShubhamRwt/8df93842d5c2a62f58316e6076472c95 to your computer and use it in GitHub Desktop.
Save ShubhamRwt/8df93842d5c2a62f58316e6076472c95 to your computer and use it in GitHub Desktop.
KafkaMetadataStateManagerMockTests.java
package io.strimzi.operator.cluster.operator.resource;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.Secret;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.kafka.listener.ListenerStatusBuilder;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.certs.OpenSslCertManager;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.PlatformFeaturesAvailability;
import io.strimzi.operator.cluster.ResourceUtils;
import io.strimzi.operator.cluster.model.AbstractModel;
import io.strimzi.operator.cluster.model.ClusterCa;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.model.KafkaPool;
import io.strimzi.operator.cluster.model.KafkaVersion;
import io.strimzi.operator.cluster.model.KafkaVersionChange;
import io.strimzi.operator.cluster.model.MockSharedEnvironmentProvider;
import io.strimzi.operator.cluster.model.SharedEnvironmentProvider;
import io.strimzi.operator.cluster.model.nodepools.NodePoolUtils;
import io.strimzi.operator.cluster.operator.assembly.KafkaAssemblyOperatorWithPoolsTest;
import io.strimzi.operator.cluster.operator.assembly.KafkaClusterCreator;
import io.strimzi.operator.cluster.operator.assembly.KafkaListenersReconciler;
import io.strimzi.operator.cluster.operator.assembly.KafkaReconciler;
import io.strimzi.operator.cluster.operator.assembly.KafkaReconcilerStatusTest;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.ClientsCa;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.model.PasswordGenerator;
import io.strimzi.operator.common.operator.MockCertManager;
import io.strimzi.operator.common.operator.resource.SecretOperator;
import io.strimzi.platform.KubernetesVersion;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Clock;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@ExtendWith(VertxExtension.class)
public class KafkaMetadataStateManagerMockTest {
private static final String NAMESPACE = "my-ns";
private static final String CLUSTER_NAME = "my-cluster";
private static final ClusterOperatorConfig CONFIG = ResourceUtils.dummyClusterOperatorConfig();
private static final KubernetesVersion KUBERNETES_VERSION = KubernetesVersion.MINIMAL_SUPPORTED_VERSION;
private static final MockCertManager CERT_MANAGER = new MockCertManager();
private static final PasswordGenerator PASSWORD_GENERATOR = new PasswordGenerator(10, "a", "a");
private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup();
private final static KafkaVersionChange VERSION_CHANGE = new KafkaVersionChange(
VERSIONS.defaultVersion(),
VERSIONS.defaultVersion(),
VERSIONS.defaultVersion().protocolVersion(),
VERSIONS.defaultVersion().messageVersion(),
VERSIONS.defaultVersion().metadataVersion()
);
private static final Reconciliation RECONCILIATION = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME);
private static final SharedEnvironmentProvider SHARED_ENV_PROVIDER = new MockSharedEnvironmentProvider();
protected static Vertx vertx;
@BeforeAll
public static void before() {
vertx = Vertx.vertx();
}
@AfterAll
public static void after() {
vertx.close();
}
private final static ClusterCa CLUSTER_CA = new ClusterCa(
Reconciliation.DUMMY_RECONCILIATION,
CERT_MANAGER,
PASSWORD_GENERATOR,
CLUSTER_NAME,
ResourceUtils.createInitialCaCertSecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaCertSecretName(CLUSTER_NAME), MockCertManager.clusterCaCert(), MockCertManager.clusterCaCertStore(), "123456"),
ResourceUtils.createInitialCaKeySecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaKeySecretName(CLUSTER_NAME), MockCertManager.clusterCaKey())
);
private final static ClientsCa CLIENTS_CA = new ClientsCa(
Reconciliation.DUMMY_RECONCILIATION,
new OpenSslCertManager(),
new PasswordGenerator(10, "a", "a"),
KafkaResources.clientsCaCertificateSecretName(CLUSTER_NAME),
ResourceUtils.createInitialCaCertSecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaCertSecretName(CLUSTER_NAME), MockCertManager.clusterCaCert(), MockCertManager.clusterCaCertStore(), "123456"),
KafkaResources.clientsCaKeySecretName(CLUSTER_NAME),
ResourceUtils.createInitialCaKeySecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaKeySecretName(CLUSTER_NAME), MockCertManager.clusterCaKey()),
365,
30,
true,
null
);
private static final Kafka KAFKA = new KafkaBuilder()
.withNewMetadata()
.withName(CLUSTER_NAME)
.withNamespace(NAMESPACE)
.withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled", Annotations.ANNO_STRIMZI_IO_KRAFT, "migration"))
.endMetadata()
.withNewSpec()
.withNewKafka()
.withListeners(new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.build())
.endKafka()
.endSpec()
.withNewStatus()
.withKafkaMetadataState("ZooKeeper")
.endStatus()
.build();
private final static KafkaNodePool CONTROLLERS = new KafkaNodePoolBuilder()
.withNewMetadata()
.withName("controllers")
.withNamespace(NAMESPACE)
.withGeneration(1L)
.withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, CLUSTER_NAME))
.endMetadata()
.withNewSpec()
.withReplicas(3)
.withNewJbodStorage()
.withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build())
.endJbodStorage()
.withRoles(ProcessRoles.CONTROLLER)
.withResources(new ResourceRequirementsBuilder().withRequests(Map.of("cpu", new Quantity("4"))).build())
.endSpec()
.build();
private final static KafkaNodePool BROKERS = new KafkaNodePoolBuilder()
.withNewMetadata()
.withName("brokers")
.withNamespace(NAMESPACE)
.withGeneration(1L)
.withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, CLUSTER_NAME))
.endMetadata()
.withNewSpec()
.withReplicas(3)
.withNewJbodStorage()
.withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("200Gi").build())
.endJbodStorage()
.withRoles(ProcessRoles.BROKER)
.withResources(new ResourceRequirementsBuilder().withRequests(Map.of("cpu", new Quantity("6"))).build())
.endSpec()
.build();
private static final List<KafkaPool> POOLS = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, KAFKA, List.of(CONTROLLERS, BROKERS), Map.of(), Map.of(), true, SHARED_ENV_PROVIDER);
private static final KafkaCluster KAFKA_CLUSTER = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, KAFKA, POOLS, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.PRE_MIGRATION, null, SHARED_ENV_PROVIDER);
@Test
public void testZookeeperToKraftMigrationWhenMigrationAnnotationEnabled(VertxTestContext context) {
ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false);
SecretOperator mockSecretOps = supplier.secretOperations;
Secret secret = new Secret();
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
MockKafkaReconciler kr = new MockKafkaReconciler(
new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME),
vertx,
CONFIG,
supplier,
new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
KAFKA,
List.of(BROKERS, CONTROLLERS),
KAFKA_CLUSTER,
CLUSTER_CA,
CLIENTS_CA);
KafkaStatus status = new KafkaStatus();
Checkpoint async = context.checkpoint();
kr.reconcile(status, Clock.systemUTC()).onComplete(res -> context.verify(() -> {
assertThat(res.succeeded(), is(true));
assertEquals(status.getKafkaMetadataState(), "KRaftMigration");
async.flag();
}));
}
static class MockKafkaReconciler extends KafkaReconciler {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(MockKafkaReconciler.class.getName());
public MockKafkaReconciler(Reconciliation reconciliation, Vertx vertx, ClusterOperatorConfig config, ResourceOperatorSupplier supplier, PlatformFeaturesAvailability pfa, Kafka kafkaAssembly, List<KafkaNodePool> nodePools, KafkaCluster kafkaCluster, ClusterCa clusterCa, ClientsCa clientsCa) {
super(reconciliation, kafkaAssembly, nodePools, kafkaCluster, clusterCa, clientsCa, config, supplier, pfa, vertx, new KafkaMetadataStateManager(reconciliation, kafkaAssembly, config.featureGates().useKRaftEnabled()));
}
@Override
public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
return modelWarnings(kafkaStatus)
.compose(i -> clusterId(kafkaStatus))
.compose(i -> updateKafkaVersion(kafkaStatus))
.compose(i -> updateKafkaMetadataMigrationState())
.compose(i -> updateKafkaMetadataState(kafkaStatus))
.recover(error -> {
LOGGER.errorCr(RECONCILIATION, "Reconciliation failed", error);
return Future.failedFuture(error);
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment