Created
January 19, 2022 16:04
-
-
Save scholzj/959c5811ca8687994f03ca13b0f95d95 to your computer and use it in GitHub Desktop.
Bootstrap LB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java | |
index 1fd88a77c..a22edda5c 100644 | |
--- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java | |
+++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java | |
@@ -2056,29 +2056,29 @@ public class KafkaAssemblyOperator extends AbstractAssemblyOperator<KubernetesCl | |
List<String> listenerAddressList = new ArrayList<>(kafkaCluster.getReplicas() + 1); | |
Future<Void> perListenerFut; | |
- if (!ListenersUtils.skipCreateBootstrapService(listener)) { | |
- perListenerFut = Future.succeededFuture(); | |
- } else { | |
- perListenerFut = serviceOperations.hasIngressAddress(reconciliation, namespace, bootstrapServiceName, 1_000, operationTimeoutMs) | |
- .compose(res -> serviceOperations.getAsync(namespace, bootstrapServiceName)) | |
- .compose(svc -> { | |
- String bootstrapAddress; | |
+ perListenerFut = Future.succeededFuture().compose(i -> { | |
+ if (ListenersUtils.skipCreateBootstrapService(listener)) { | |
+ return Future.succeededFuture(); | |
+ } else { | |
+ return serviceOperations.hasIngressAddress(reconciliation, namespace, bootstrapServiceName, 1_000, operationTimeoutMs) | |
+ .compose(res -> serviceOperations.getAsync(namespace, bootstrapServiceName)) | |
+ .compose(svc -> { | |
+ String bootstrapAddress; | |
- if (svc.getStatus().getLoadBalancer().getIngress().get(0).getHostname() != null) { | |
- bootstrapAddress = svc.getStatus().getLoadBalancer().getIngress().get(0).getHostname(); | |
- } else { | |
- bootstrapAddress = svc.getStatus().getLoadBalancer().getIngress().get(0).getIp(); | |
- } | |
+ if (svc.getStatus().getLoadBalancer().getIngress().get(0).getHostname() != null) { | |
+ bootstrapAddress = svc.getStatus().getLoadBalancer().getIngress().get(0).getHostname(); | |
+ } else { | |
+ bootstrapAddress = svc.getStatus().getLoadBalancer().getIngress().get(0).getIp(); | |
+ } | |
- LOGGER.debugCr(reconciliation, "Found address {} for Service {}", bootstrapAddress, bootstrapServiceName); | |
+ LOGGER.debugCr(reconciliation, "Found address {} for Service {}", bootstrapAddress, bootstrapServiceName); | |
- kafkaBootstrapDnsName.add(bootstrapAddress); | |
- listenerAddressList.add(bootstrapAddress); | |
- return Future.succeededFuture(); | |
- }); | |
- } | |
- | |
- perListenerFut.compose(res -> { | |
+ kafkaBootstrapDnsName.add(bootstrapAddress); | |
+ listenerAddressList.add(bootstrapAddress); | |
+ return Future.succeededFuture(); | |
+ }); | |
+ } | |
+ }).compose(res -> { | |
List<Future> perPodFutures = new ArrayList<>(kafkaCluster.getReplicas()); | |
for (int pod = 0; pod < kafkaCluster.getReplicas(); pod++) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment