Skip to content

Instantly share code, notes, and snippets.

@mmanco
Last active September 8, 2018 00:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmanco/fce3cec41adad1a314fd72b3b5095dc6 to your computer and use it in GitHub Desktop.
Save mmanco/fce3cec41adad1a314fd72b3b5095dc6 to your computer and use it in GitHub Desktop.
public class AtomixClusterFactory implements ClusterFactory<Atomix> {
private static Logger logger = LoggerFactory.getLogger(ClusterFactory.class);
private SchedulingService schedulingService;
private ClusterSettings settings;
private Function<ClusterSettings, Atomix> builder;
private CompletableFuture<Atomix> cluster;
private CompletableFuture<Atomix> bootstrap;
private volatile ScheduledFuture<?> bootstrapMonitor;
private volatile long bootstrapStartTime;
AtomixClusterFactory(SchedulingService schedulingService, ClusterSettings settings,
Function<ClusterSettings, Atomix> builder) {
this.schedulingService = schedulingService;
this.settings = settings;
this.builder = builder;
this.bootstrap = new CompletableFuture<>();
}
@PostConstruct
public void initialize() {
if (settings.isBootstrapValidationEnabled()) {
bootstrapStartTime = System.currentTimeMillis();
bootstrapMonitor =
schedulingService.schedule(this::bootstrapCluster,
settings.getBootstrapValidation().getScheduling());
} else {
bootstrap.complete(builder.apply(settings));
}
cluster = bootstrap
.thenCompose((atomix -> atomix.start().thenApply((signal) -> atomix)))
.exceptionally((e) -> {
logger.error("Cluster bootstrap failed", e);
return null;
});
}
@Override
public CompletableFuture<Atomix> create() {
return cluster;
}
private void bootstrapCluster() {
boolean valid = isValidNode(settings.getNode()) &&
settings.getBootstrap().stream().allMatch(this::isValidNode);
long elapsedTime = System.currentTimeMillis() - bootstrapStartTime;
if (valid) {
logger.info("All cluster nodes are reachable");
bootstrap.complete(builder.apply(settings));
bootstrapMonitor.cancel(true);
} else if (elapsedTime >= settings.getBootstrapValidation().getDurationInMillis()) {
logger.warn("Cluster nodes were not reachable within {}",
formatDurationWords(settings.getBootstrapValidation().getDurationInMillis(), true, true));
bootstrap.completeExceptionally(new RuntimeException("Cluster nodes not reachable"));
bootstrapMonitor.cancel(true);
}
}
private boolean isValidNode(ClusterNodeSettings node) {
boolean valid = true;
try {
Address.from(node.getAddress());
} catch (MalformedAddressException e) {
valid = false;
Throwable cause = e.getCause();
if (cause instanceof UnknownHostException) {
logger.warn("Node {} with address {} is not reachable", node.getId(), node.getAddress());
} else {
logger.error("Node {} width address is not valid", node.getId(), node.getAddress(), e);
bootstrap.completeExceptionally(e);
bootstrapMonitor.cancel(true);
}
}
return valid;
}
static Atomix buildAtomix(ClusterSettings settings) {
AtomixBuilder builder = Atomix.builder()
.withMemberId(settings.getNode().getId())
.withAddress(settings.getNode().getAddress())
.withShutdownHookEnabled();
if (settings.getBootstrap().isEmpty()) {
builder.withMulticastEnabled();
} else {
builder
.withMembershipProvider(BootstrapDiscoveryProvider
.builder()
.withNodes(settings.getBootstrap()
.stream()
.map(bootstrapNode -> Node.builder()
.withId(bootstrapNode.getId())
.withAddress(bootstrapNode.getAddress())
.build())
.collect(Collectors.toList()))
.build());
}
if (settings.isBootstrapNode()) {
ClusterSettings.Protocol managementGroupProtocol =
settings.getManagementPartition().getProtocol();
ClusterSettings.Protocol dataGroupProtocol =
settings.getDataPartition().getProtocol();
builder
.withManagementGroup(managementGroupProtocol == ClusterSettings.Protocol.Raft ?
raftPartitionGroup("system", settings.getBootstrap(), 1) :
primaryBackupPartitionGroup("system", 1))
.withPartitionGroups(dataGroupProtocol == ClusterSettings.Protocol.Raft ?
raftPartitionGroup("data", settings.getBootstrap(), 32) :
primaryBackupPartitionGroup("data", 32));
}
return builder.build();
}
private static ManagedPartitionGroup primaryBackupPartitionGroup(String groupName, int numPartitions) {
return PrimaryBackupPartitionGroup.builder(groupName)
.withNumPartitions(numPartitions)
.build();
}
private static ManagedPartitionGroup raftPartitionGroup(String groupName,
List<ClusterNodeSettings> bootstrapNodes,
int numPartitions) {
return RaftPartitionGroup.builder(groupName)
.withNumPartitions(numPartitions)
.withStorageLevel(StorageLevel.MEMORY)
.withDataDirectory(Files.createTempDir())
.withMembers(bootstrapNodes.stream()
.map(ClusterNodeSettings::getId)
.collect(Collectors.toList()))
.build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment