Skip to content

Instantly share code, notes, and snippets.

@BenWhitehead
Created October 8, 2015 02:46
Show Gist options
  • Save BenWhitehead/8267c72e4e894c2e2b29 to your computer and use it in GitHub Desktop.
Save BenWhitehead/8267c72e4e894c2e2b29 to your computer and use it in GitHub Desktop.
Cassandra-Mesos Dynamic reservation
final Optional<TaskResources> reserveResourcesForHost = cassandraCluster.shouldCreateReservation(marker, offer);
if (reserveResourcesForHost.isPresent()) {
final TaskResources res = reserveResourcesForHost.get();
final List<Resource> resources = newArrayList(
reserveCpu(res.getCpuCores(), mesosRole, principal),
reserveMem(res.getMemMb(), mesosRole, principal),
reserveDisk(res.getDiskMb(), mesosRole, principal),
reservePorts(res.getPortsList(), mesosRole, principal)
);
final Offer.Operation reservation = Offer.Operation.newBuilder()
.setType(Offer.Operation.Type.RESERVE)
.setReserve(
Offer.Operation.Reserve.newBuilder()
.addAllResources(resources)
.build()
)
.build();
driver.acceptOffers(Collections.singletonList(offer.getId()), Collections.singletonList(reservation), Filters.getDefaultInstance());
}
@NotNull
public Optional<TaskResources> shouldCreateReservation(@NotNull final Marker marker, @NotNull final Protos.Offer offer) {
if (!configuration.isReserveRequired()) {
LOGGER.info(marker, "Resources Reservation is not enabled");
return Optional.absent();
} else if (cassandraNodeForHostname(offer.getHostname()).isPresent()) {
LOGGER.info(marker, "Cassandra node already allocated for host.");
return Optional.absent();
} else {
final NodeCounts nodeCounts = clusterState.nodeCounts();
final long now = clock.now().getMillis();
final long nextPossibleServerLaunchTimestamp = nextPossibleServerLaunchTimestamp();
final boolean serverLaunchTimeoutActive = !canLaunchServerTask(now, nextPossibleServerLaunchTimestamp);
if (nodeCounts.getNodeCount() >= configuration.targetNumberOfNodes()) {
LOGGER.debug(marker, "Number of desired Cassandra Nodes Acquired, no need to create Resource Reservation.");
return Optional.absent();
} else if (serverLaunchTimeoutActive) {
final long nextPossibleServerLaunchSeconds = secondsUntilNextPossibleServerLaunch(now, nextPossibleServerLaunchTimestamp);
LOGGER.info(marker, "Preventing creation of new node because server launch timeout active. Next server launch possible in {}s", nextPossibleServerLaunchSeconds);
return Optional.absent();
} else {
final CassandraConfigRole configRole = configuration.getDefaultConfigRole();
final CassandraFrameworkConfiguration config = configuration.get();
final TaskResources allResources = add(
add(EXECUTOR_RESOURCES, METADATA_TASK_RESOURCES),
configRole.getResources()
);
final List<String> executorSizeErrors = hasResources(
offer,
allResources,
portMappings(config),
configRole.getMesosRole()
);
if (executorSizeErrors.isEmpty()) {
LOGGER.info(marker, "Attempting to create Resource Reservation");
return Optional.of(allResources);
} else {
LOGGER.info(
marker,
"Insufficient resources in offer for executor, not attempting to launch new node. Details for offer {}: ['{}']",
offer.getId().getValue(), JOINER.join(executorSizeErrors)
);
return Optional.absent();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment