Created
August 6, 2021 13:37
-
-
Save ShubhamRwt/11386bd2f020a75fc775c7e6154f0492 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:JavaNCSS"}) | |
public static KafkaCluster fromCrd(Reconciliation reconciliation, Kafka kafkaAssembly, KafkaVersion.Lookup versions, Storage oldStorage, int oldReplicas) { | |
KafkaCluster result = new KafkaCluster(reconciliation, kafkaAssembly); | |
result.setOwnerReference(kafkaAssembly); | |
KafkaSpec kafkaSpec = kafkaAssembly.getSpec(); | |
KafkaClusterSpec kafkaClusterSpec = kafkaSpec.getKafka(); | |
result.setReplicas(kafkaClusterSpec.getReplicas()); | |
validateIntConfigProperty("default.replication.factor", kafkaClusterSpec); | |
validateIntConfigProperty("offsets.topic.replication.factor", kafkaClusterSpec); | |
validateIntConfigProperty("transaction.state.log.replication.factor", kafkaClusterSpec); | |
validateIntConfigProperty("transaction.state.log.min.isr", kafkaClusterSpec); | |
result.setImage(versions.kafkaImage(kafkaClusterSpec.getImage(), kafkaClusterSpec.getVersion())); | |
if (kafkaClusterSpec.getReadinessProbe() != null) { | |
result.setReadinessProbe(kafkaClusterSpec.getReadinessProbe()); | |
} | |
if (kafkaClusterSpec.getLivenessProbe() != null) { | |
result.setLivenessProbe(kafkaClusterSpec.getLivenessProbe()); | |
} | |
result.setRack(kafkaClusterSpec.getRack()); | |
String initImage = kafkaClusterSpec.getBrokerRackInitImage(); | |
if (initImage == null) { | |
initImage = System.getenv().getOrDefault(ClusterOperatorConfig.STRIMZI_DEFAULT_KAFKA_INIT_IMAGE, "quay.io/strimzi/operator:latest"); | |
} | |
result.setInitImage(initImage); | |
Logging logging = kafkaClusterSpec.getLogging(); | |
result.setLogging(logging == null ? new InlineLogging() : logging); | |
result.setGcLoggingEnabled(kafkaClusterSpec.getJvmOptions() == null ? DEFAULT_JVM_GC_LOGGING_ENABLED : kafkaClusterSpec.getJvmOptions().isGcLoggingEnabled()); | |
if (kafkaClusterSpec.getJvmOptions() != null) { | |
result.setJavaSystemProperties(kafkaClusterSpec.getJvmOptions().getJavaSystemProperties()); | |
} | |
result.setJvmOptions(kafkaClusterSpec.getJvmOptions()); | |
if (kafkaClusterSpec.getJmxOptions() != null) { | |
result.setJmxEnabled(Boolean.TRUE); | |
AuthenticationUtils.configureKafkaJmxOptions(kafkaClusterSpec.getJmxOptions().getAuthentication(), result); | |
} | |
// Handle Kafka broker configuration | |
KafkaVersion desiredVersion = versions.version(kafkaClusterSpec.getVersion()); | |
KafkaConfiguration configuration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet()); | |
configureCruiseControlMetrics(kafkaAssembly, result, configuration); | |
validateConfiguration(reconciliation, kafkaAssembly, desiredVersion, configuration); | |
result.setConfiguration(configuration); | |
// Parse different types of metrics configurations | |
ModelUtils.parseMetrics(result, kafkaClusterSpec); | |
if (oldStorage != null) { | |
Storage newStorage = kafkaClusterSpec.getStorage(); | |
AbstractModel.validatePersistentStorage(newStorage); | |
StorageDiff diff = new StorageDiff(reconciliation, oldStorage, newStorage, oldReplicas, kafkaClusterSpec.getReplicas()); | |
if (!diff.isEmpty()) { | |
LOGGER.warnCr(reconciliation, "Only the following changes to Kafka storage are allowed: " + | |
"changing the deleteClaim flag, " + | |
"adding volumes to Jbod storage or removing volumes from Jbod storage, " + | |
"changing overrides to nodes which do not exist yet" + | |
"and increasing size of persistent claim volumes (depending on the volume type and used storage class)."); | |
LOGGER.warnCr(reconciliation, "The desired Kafka storage configuration in the custom resource {}/{} contains changes which are not allowed. As a " + | |
"result, all storage changes will be ignored. Use DEBUG level logging for more information " + | |
"about the detected changes.", kafkaAssembly.getMetadata().getNamespace(), kafkaAssembly.getMetadata().getName()); | |
Condition warning = StatusUtils.buildWarningCondition("KafkaStorage", | |
"The desired Kafka storage configuration contains changes which are not allowed. As a " + | |
"result, all storage changes will be ignored. Use DEBUG level logging for more information " + | |
"about the detected changes."); | |
result.addWarningCondition(warning); | |
result.setStorage(oldStorage); | |
} else { | |
result.setStorage(newStorage); | |
} | |
} else { | |
result.setStorage(kafkaClusterSpec.getStorage()); | |
} | |
result.setDataVolumesClaimsAndMountPaths(result.getStorage()); | |
result.setResources(kafkaClusterSpec.getResources()); | |
// Configure listeners | |
if (kafkaClusterSpec.getListeners() == null || kafkaClusterSpec.getListeners().isEmpty()) { | |
LOGGER.errorCr(reconciliation, "The required field .spec.kafka.listeners is missing"); | |
throw new InvalidResourceException("The required field .spec.kafka.listeners is missing"); | |
} | |
List<GenericKafkaListener> listeners = kafkaClusterSpec.getListeners(); | |
ListenersValidator.validate(reconciliation, kafkaClusterSpec.getReplicas(), listeners); | |
result.setListeners(listeners); | |
// Set authorization | |
if (kafkaClusterSpec.getAuthorization() instanceof KafkaAuthorizationKeycloak) { | |
if (!ListenersUtils.hasListenerWithOAuth(listeners)) { | |
throw new InvalidResourceException("You cannot configure Keycloak Authorization without any listener with OAuth based authentication"); | |
} else { | |
KafkaAuthorizationKeycloak authorizationKeycloak = (KafkaAuthorizationKeycloak) kafkaClusterSpec.getAuthorization(); | |
if (authorizationKeycloak.getClientId() == null || authorizationKeycloak.getTokenEndpointUri() == null) { | |
LOGGER.errorCr(reconciliation, "Keycloak Authorization: Token Endpoint URI and clientId are both required"); | |
throw new InvalidResourceException("Keycloak Authorization: Token Endpoint URI and clientId are both required"); | |
} | |
} | |
} | |
result.setAuthorization(kafkaClusterSpec.getAuthorization()); | |
if (kafkaClusterSpec.getTemplate() != null) { | |
KafkaClusterTemplate template = kafkaClusterSpec.getTemplate(); | |
if (template.getStatefulset() != null) { | |
if (template.getStatefulset().getPodManagementPolicy() != null) { | |
result.templatePodManagementPolicy = template.getStatefulset().getPodManagementPolicy(); | |
} | |
if (template.getStatefulset().getMetadata() != null) { | |
result.templateStatefulSetLabels = template.getStatefulset().getMetadata().getLabels(); | |
result.templateStatefulSetAnnotations = template.getStatefulset().getMetadata().getAnnotations(); | |
} | |
} | |
ModelUtils.parsePodTemplate(result, template.getPod()); | |
ModelUtils.parseInternalServiceTemplate(result, template.getBootstrapService()); | |
ModelUtils.parseInternalHeadlessServiceTemplate(result, template.getBrokersService()); | |
if (template.getExternalBootstrapService() != null) { | |
if (template.getExternalBootstrapService().getMetadata() != null) { | |
result.templateExternalBootstrapServiceLabels = template.getExternalBootstrapService().getMetadata().getLabels(); | |
result.templateExternalBootstrapServiceAnnotations = template.getExternalBootstrapService().getMetadata().getAnnotations(); | |
} | |
} | |
if (template.getPerPodService() != null) { | |
if (template.getPerPodService().getMetadata() != null) { | |
result.templatePerPodServiceLabels = template.getPerPodService().getMetadata().getLabels(); | |
result.templatePerPodServiceAnnotations = template.getPerPodService().getMetadata().getAnnotations(); | |
} | |
} | |
if (template.getExternalBootstrapRoute() != null && template.getExternalBootstrapRoute().getMetadata() != null) { | |
result.templateExternalBootstrapRouteLabels = template.getExternalBootstrapRoute().getMetadata().getLabels(); | |
result.templateExternalBootstrapRouteAnnotations = template.getExternalBootstrapRoute().getMetadata().getAnnotations(); | |
} | |
if (template.getPerPodRoute() != null && template.getPerPodRoute().getMetadata() != null) { | |
result.templatePerPodRouteLabels = template.getPerPodRoute().getMetadata().getLabels(); | |
result.templatePerPodRouteAnnotations = template.getPerPodRoute().getMetadata().getAnnotations(); | |
} | |
if (template.getExternalBootstrapIngress() != null && template.getExternalBootstrapIngress().getMetadata() != null) { | |
result.templateExternalBootstrapIngressLabels = template.getExternalBootstrapIngress().getMetadata().getLabels(); | |
result.templateExternalBootstrapIngressAnnotations = template.getExternalBootstrapIngress().getMetadata().getAnnotations(); | |
} | |
if (template.getPerPodIngress() != null && template.getPerPodIngress().getMetadata() != null) { | |
result.templatePerPodIngressLabels = template.getPerPodIngress().getMetadata().getLabels(); | |
result.templatePerPodIngressAnnotations = template.getPerPodIngress().getMetadata().getAnnotations(); | |
} | |
if (template.getClusterRoleBinding() != null && template.getClusterRoleBinding().getMetadata() != null) { | |
result.templateClusterRoleBindingLabels = template.getClusterRoleBinding().getMetadata().getLabels(); | |
result.templateClusterRoleBindingAnnotations = template.getClusterRoleBinding().getMetadata().getAnnotations(); | |
} | |
if (template.getPersistentVolumeClaim() != null && template.getPersistentVolumeClaim().getMetadata() != null) { | |
result.templatePersistentVolumeClaimLabels = Util.mergeLabelsOrAnnotations(template.getPersistentVolumeClaim().getMetadata().getLabels(), | |
result.templateStatefulSetLabels); | |
result.templatePersistentVolumeClaimAnnotations = template.getPersistentVolumeClaim().getMetadata().getAnnotations(); | |
} | |
if (template.getKafkaContainer() != null && template.getKafkaContainer().getEnv() != null) { | |
result.templateKafkaContainerEnvVars = template.getKafkaContainer().getEnv(); | |
} | |
if (template.getInitContainer() != null && template.getInitContainer().getEnv() != null) { | |
result.templateInitContainerEnvVars = template.getInitContainer().getEnv(); | |
} | |
if (template.getKafkaContainer() != null && template.getKafkaContainer().getSecurityContext() != null) { | |
result.templateKafkaContainerSecurityContext = template.getKafkaContainer().getSecurityContext(); | |
} | |
if (template.getInitContainer() != null && template.getInitContainer().getSecurityContext() != null) { | |
result.templateInitContainerSecurityContext = template.getInitContainer().getSecurityContext(); | |
} | |
if (template.getServiceAccount() != null && template.getServiceAccount().getMetadata() != null) { | |
result.templateServiceAccountLabels = template.getServiceAccount().getMetadata().getLabels(); | |
result.templateServiceAccountAnnotations = template.getServiceAccount().getMetadata().getAnnotations(); | |
} | |
ModelUtils.parsePodDisruptionBudgetTemplate(result, template.getPodDisruptionBudget()); | |
} | |
result.kafkaVersion = versions.version(kafkaClusterSpec.getVersion()); | |
return result; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment