-
-
Save mmanco/fce3cec41adad1a314fd72b3b5095dc6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AtomixClusterFactory implements ClusterFactory<Atomix> { | |
private static Logger logger = LoggerFactory.getLogger(ClusterFactory.class); | |
private SchedulingService schedulingService; | |
private ClusterSettings settings; | |
private Function<ClusterSettings, Atomix> builder; | |
private CompletableFuture<Atomix> cluster; | |
private CompletableFuture<Atomix> bootstrap; | |
private volatile ScheduledFuture<?> bootstrapMonitor; | |
private volatile long bootstrapStartTime; | |
AtomixClusterFactory(SchedulingService schedulingService, ClusterSettings settings, | |
Function<ClusterSettings, Atomix> builder) { | |
this.schedulingService = schedulingService; | |
this.settings = settings; | |
this.builder = builder; | |
this.bootstrap = new CompletableFuture<>(); | |
} | |
@PostConstruct | |
public void initialize() { | |
if (settings.isBootstrapValidationEnabled()) { | |
bootstrapStartTime = System.currentTimeMillis(); | |
bootstrapMonitor = | |
schedulingService.schedule(this::bootstrapCluster, | |
settings.getBootstrapValidation().getScheduling()); | |
} else { | |
bootstrap.complete(builder.apply(settings)); | |
} | |
cluster = bootstrap | |
.thenCompose((atomix -> atomix.start().thenApply((signal) -> atomix))) | |
.exceptionally((e) -> { | |
logger.error("Cluster bootstrap failed", e); | |
return null; | |
}); | |
} | |
@Override | |
public CompletableFuture<Atomix> create() { | |
return cluster; | |
} | |
private void bootstrapCluster() { | |
boolean valid = isValidNode(settings.getNode()) && | |
settings.getBootstrap().stream().allMatch(this::isValidNode); | |
long elapsedTime = System.currentTimeMillis() - bootstrapStartTime; | |
if (valid) { | |
logger.info("All cluster nodes are reachable"); | |
bootstrap.complete(builder.apply(settings)); | |
bootstrapMonitor.cancel(true); | |
} else if (elapsedTime >= settings.getBootstrapValidation().getDurationInMillis()) { | |
logger.warn("Cluster nodes were not reachable within {}", | |
formatDurationWords(settings.getBootstrapValidation().getDurationInMillis(), true, true)); | |
bootstrap.completeExceptionally(new RuntimeException("Cluster nodes not reachable")); | |
bootstrapMonitor.cancel(true); | |
} | |
} | |
private boolean isValidNode(ClusterNodeSettings node) { | |
boolean valid = true; | |
try { | |
Address.from(node.getAddress()); | |
} catch (MalformedAddressException e) { | |
valid = false; | |
Throwable cause = e.getCause(); | |
if (cause instanceof UnknownHostException) { | |
logger.warn("Node {} with address {} is not reachable", node.getId(), node.getAddress()); | |
} else { | |
logger.error("Node {} width address is not valid", node.getId(), node.getAddress(), e); | |
bootstrap.completeExceptionally(e); | |
bootstrapMonitor.cancel(true); | |
} | |
} | |
return valid; | |
} | |
static Atomix buildAtomix(ClusterSettings settings) { | |
AtomixBuilder builder = Atomix.builder() | |
.withMemberId(settings.getNode().getId()) | |
.withAddress(settings.getNode().getAddress()) | |
.withShutdownHookEnabled(); | |
if (settings.getBootstrap().isEmpty()) { | |
builder.withMulticastEnabled(); | |
} else { | |
builder | |
.withMembershipProvider(BootstrapDiscoveryProvider | |
.builder() | |
.withNodes(settings.getBootstrap() | |
.stream() | |
.map(bootstrapNode -> Node.builder() | |
.withId(bootstrapNode.getId()) | |
.withAddress(bootstrapNode.getAddress()) | |
.build()) | |
.collect(Collectors.toList())) | |
.build()); | |
} | |
if (settings.isBootstrapNode()) { | |
ClusterSettings.Protocol managementGroupProtocol = | |
settings.getManagementPartition().getProtocol(); | |
ClusterSettings.Protocol dataGroupProtocol = | |
settings.getDataPartition().getProtocol(); | |
builder | |
.withManagementGroup(managementGroupProtocol == ClusterSettings.Protocol.Raft ? | |
raftPartitionGroup("system", settings.getBootstrap(), 1) : | |
primaryBackupPartitionGroup("system", 1)) | |
.withPartitionGroups(dataGroupProtocol == ClusterSettings.Protocol.Raft ? | |
raftPartitionGroup("data", settings.getBootstrap(), 32) : | |
primaryBackupPartitionGroup("data", 32)); | |
} | |
return builder.build(); | |
} | |
private static ManagedPartitionGroup primaryBackupPartitionGroup(String groupName, int numPartitions) { | |
return PrimaryBackupPartitionGroup.builder(groupName) | |
.withNumPartitions(numPartitions) | |
.build(); | |
} | |
private static ManagedPartitionGroup raftPartitionGroup(String groupName, | |
List<ClusterNodeSettings> bootstrapNodes, | |
int numPartitions) { | |
return RaftPartitionGroup.builder(groupName) | |
.withNumPartitions(numPartitions) | |
.withStorageLevel(StorageLevel.MEMORY) | |
.withDataDirectory(Files.createTempDir()) | |
.withMembers(bootstrapNodes.stream() | |
.map(ClusterNodeSettings::getId) | |
.collect(Collectors.toList())) | |
.build(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment