Created
January 30, 2024 11:56
-
-
Save ShubhamRwt/8df93842d5c2a62f58316e6076472c95 to your computer and use it in GitHub Desktop.
KafkaMetadataStateManagerMockTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.strimzi.operator.cluster.operator.resource; | |
import io.fabric8.kubernetes.api.model.Quantity; | |
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; | |
import io.fabric8.kubernetes.api.model.Secret; | |
import io.strimzi.api.kafka.model.kafka.Kafka; | |
import io.strimzi.api.kafka.model.kafka.KafkaBuilder; | |
import io.strimzi.api.kafka.model.kafka.KafkaResources; | |
import io.strimzi.api.kafka.model.kafka.KafkaStatus; | |
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; | |
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; | |
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; | |
import io.strimzi.api.kafka.model.kafka.listener.ListenerStatusBuilder; | |
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool; | |
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder; | |
import io.strimzi.api.kafka.model.nodepool.ProcessRoles; | |
import io.strimzi.certs.OpenSslCertManager; | |
import io.strimzi.operator.cluster.ClusterOperatorConfig; | |
import io.strimzi.operator.cluster.KafkaVersionTestUtils; | |
import io.strimzi.operator.cluster.PlatformFeaturesAvailability; | |
import io.strimzi.operator.cluster.ResourceUtils; | |
import io.strimzi.operator.cluster.model.AbstractModel; | |
import io.strimzi.operator.cluster.model.ClusterCa; | |
import io.strimzi.operator.cluster.model.KafkaCluster; | |
import io.strimzi.operator.cluster.model.KafkaPool; | |
import io.strimzi.operator.cluster.model.KafkaVersion; | |
import io.strimzi.operator.cluster.model.KafkaVersionChange; | |
import io.strimzi.operator.cluster.model.MockSharedEnvironmentProvider; | |
import io.strimzi.operator.cluster.model.SharedEnvironmentProvider; | |
import io.strimzi.operator.cluster.model.nodepools.NodePoolUtils; | |
import io.strimzi.operator.cluster.operator.assembly.KafkaAssemblyOperatorWithPoolsTest; | |
import io.strimzi.operator.cluster.operator.assembly.KafkaClusterCreator; | |
import io.strimzi.operator.cluster.operator.assembly.KafkaListenersReconciler; | |
import io.strimzi.operator.cluster.operator.assembly.KafkaReconciler; | |
import io.strimzi.operator.cluster.operator.assembly.KafkaReconcilerStatusTest; | |
import io.strimzi.operator.common.Annotations; | |
import io.strimzi.operator.common.Reconciliation; | |
import io.strimzi.operator.common.ReconciliationLogger; | |
import io.strimzi.operator.common.model.ClientsCa; | |
import io.strimzi.operator.common.model.Labels; | |
import io.strimzi.operator.common.model.PasswordGenerator; | |
import io.strimzi.operator.common.operator.MockCertManager; | |
import io.strimzi.operator.common.operator.resource.SecretOperator; | |
import io.strimzi.platform.KubernetesVersion; | |
import io.vertx.core.Future; | |
import io.vertx.core.Vertx; | |
import io.vertx.junit5.Checkpoint; | |
import io.vertx.junit5.VertxExtension; | |
import io.vertx.junit5.VertxTestContext; | |
import org.junit.jupiter.api.AfterAll; | |
import org.junit.jupiter.api.BeforeAll; | |
import org.junit.jupiter.api.Test; | |
import org.junit.jupiter.api.extension.ExtendWith; | |
import java.time.Clock; | |
import java.util.List; | |
import java.util.Map; | |
import static org.hamcrest.CoreMatchers.is; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
import static org.junit.jupiter.api.Assertions.assertEquals; | |
import static org.mockito.ArgumentMatchers.any; | |
import static org.mockito.ArgumentMatchers.eq; | |
import static org.mockito.Mockito.when; | |
@ExtendWith(VertxExtension.class) | |
public class KafkaMetadataStateManagerMockTest { | |
private static final String NAMESPACE = "my-ns"; | |
private static final String CLUSTER_NAME = "my-cluster"; | |
private static final ClusterOperatorConfig CONFIG = ResourceUtils.dummyClusterOperatorConfig(); | |
private static final KubernetesVersion KUBERNETES_VERSION = KubernetesVersion.MINIMAL_SUPPORTED_VERSION; | |
private static final MockCertManager CERT_MANAGER = new MockCertManager(); | |
private static final PasswordGenerator PASSWORD_GENERATOR = new PasswordGenerator(10, "a", "a"); | |
private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); | |
private final static KafkaVersionChange VERSION_CHANGE = new KafkaVersionChange( | |
VERSIONS.defaultVersion(), | |
VERSIONS.defaultVersion(), | |
VERSIONS.defaultVersion().protocolVersion(), | |
VERSIONS.defaultVersion().messageVersion(), | |
VERSIONS.defaultVersion().metadataVersion() | |
); | |
private static final Reconciliation RECONCILIATION = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME); | |
private static final SharedEnvironmentProvider SHARED_ENV_PROVIDER = new MockSharedEnvironmentProvider(); | |
protected static Vertx vertx; | |
@BeforeAll | |
public static void before() { | |
vertx = Vertx.vertx(); | |
} | |
@AfterAll | |
public static void after() { | |
vertx.close(); | |
} | |
private final static ClusterCa CLUSTER_CA = new ClusterCa( | |
Reconciliation.DUMMY_RECONCILIATION, | |
CERT_MANAGER, | |
PASSWORD_GENERATOR, | |
CLUSTER_NAME, | |
ResourceUtils.createInitialCaCertSecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaCertSecretName(CLUSTER_NAME), MockCertManager.clusterCaCert(), MockCertManager.clusterCaCertStore(), "123456"), | |
ResourceUtils.createInitialCaKeySecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaKeySecretName(CLUSTER_NAME), MockCertManager.clusterCaKey()) | |
); | |
private final static ClientsCa CLIENTS_CA = new ClientsCa( | |
Reconciliation.DUMMY_RECONCILIATION, | |
new OpenSslCertManager(), | |
new PasswordGenerator(10, "a", "a"), | |
KafkaResources.clientsCaCertificateSecretName(CLUSTER_NAME), | |
ResourceUtils.createInitialCaCertSecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaCertSecretName(CLUSTER_NAME), MockCertManager.clusterCaCert(), MockCertManager.clusterCaCertStore(), "123456"), | |
KafkaResources.clientsCaKeySecretName(CLUSTER_NAME), | |
ResourceUtils.createInitialCaKeySecret(NAMESPACE, CLUSTER_NAME, AbstractModel.clusterCaKeySecretName(CLUSTER_NAME), MockCertManager.clusterCaKey()), | |
365, | |
30, | |
true, | |
null | |
); | |
private static final Kafka KAFKA = new KafkaBuilder() | |
.withNewMetadata() | |
.withName(CLUSTER_NAME) | |
.withNamespace(NAMESPACE) | |
.withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled", Annotations.ANNO_STRIMZI_IO_KRAFT, "migration")) | |
.endMetadata() | |
.withNewSpec() | |
.withNewKafka() | |
.withListeners(new GenericKafkaListenerBuilder() | |
.withName("plain") | |
.withPort(9092) | |
.withType(KafkaListenerType.INTERNAL) | |
.withTls(false) | |
.build()) | |
.endKafka() | |
.endSpec() | |
.withNewStatus() | |
.withKafkaMetadataState("ZooKeeper") | |
.endStatus() | |
.build(); | |
private final static KafkaNodePool CONTROLLERS = new KafkaNodePoolBuilder() | |
.withNewMetadata() | |
.withName("controllers") | |
.withNamespace(NAMESPACE) | |
.withGeneration(1L) | |
.withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, CLUSTER_NAME)) | |
.endMetadata() | |
.withNewSpec() | |
.withReplicas(3) | |
.withNewJbodStorage() | |
.withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()) | |
.endJbodStorage() | |
.withRoles(ProcessRoles.CONTROLLER) | |
.withResources(new ResourceRequirementsBuilder().withRequests(Map.of("cpu", new Quantity("4"))).build()) | |
.endSpec() | |
.build(); | |
private final static KafkaNodePool BROKERS = new KafkaNodePoolBuilder() | |
.withNewMetadata() | |
.withName("brokers") | |
.withNamespace(NAMESPACE) | |
.withGeneration(1L) | |
.withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, CLUSTER_NAME)) | |
.endMetadata() | |
.withNewSpec() | |
.withReplicas(3) | |
.withNewJbodStorage() | |
.withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("200Gi").build()) | |
.endJbodStorage() | |
.withRoles(ProcessRoles.BROKER) | |
.withResources(new ResourceRequirementsBuilder().withRequests(Map.of("cpu", new Quantity("6"))).build()) | |
.endSpec() | |
.build(); | |
private static final List<KafkaPool> POOLS = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, KAFKA, List.of(CONTROLLERS, BROKERS), Map.of(), Map.of(), true, SHARED_ENV_PROVIDER); | |
private static final KafkaCluster KAFKA_CLUSTER = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, KAFKA, POOLS, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.PRE_MIGRATION, null, SHARED_ENV_PROVIDER); | |
@Test | |
public void testZookeeperToKraftMigrationWhenMigrationAnnotationEnabled(VertxTestContext context) { | |
ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); | |
SecretOperator mockSecretOps = supplier.secretOperations; | |
Secret secret = new Secret(); | |
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret)); | |
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret)); | |
MockKafkaReconciler kr = new MockKafkaReconciler( | |
new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), | |
vertx, | |
CONFIG, | |
supplier, | |
new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), | |
KAFKA, | |
List.of(BROKERS, CONTROLLERS), | |
KAFKA_CLUSTER, | |
CLUSTER_CA, | |
CLIENTS_CA); | |
KafkaStatus status = new KafkaStatus(); | |
Checkpoint async = context.checkpoint(); | |
kr.reconcile(status, Clock.systemUTC()).onComplete(res -> context.verify(() -> { | |
assertThat(res.succeeded(), is(true)); | |
assertEquals(status.getKafkaMetadataState(), "KRaftMigration"); | |
async.flag(); | |
})); | |
} | |
static class MockKafkaReconciler extends KafkaReconciler { | |
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(MockKafkaReconciler.class.getName()); | |
public MockKafkaReconciler(Reconciliation reconciliation, Vertx vertx, ClusterOperatorConfig config, ResourceOperatorSupplier supplier, PlatformFeaturesAvailability pfa, Kafka kafkaAssembly, List<KafkaNodePool> nodePools, KafkaCluster kafkaCluster, ClusterCa clusterCa, ClientsCa clientsCa) { | |
super(reconciliation, kafkaAssembly, nodePools, kafkaCluster, clusterCa, clientsCa, config, supplier, pfa, vertx, new KafkaMetadataStateManager(reconciliation, kafkaAssembly, config.featureGates().useKRaftEnabled())); | |
} | |
@Override | |
public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) { | |
return modelWarnings(kafkaStatus) | |
.compose(i -> clusterId(kafkaStatus)) | |
.compose(i -> updateKafkaVersion(kafkaStatus)) | |
.compose(i -> updateKafkaMetadataMigrationState()) | |
.compose(i -> updateKafkaMetadataState(kafkaStatus)) | |
.recover(error -> { | |
LOGGER.errorCr(RECONCILIATION, "Reconciliation failed", error); | |
return Future.failedFuture(error); | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment