Skip to content

Instantly share code, notes, and snippets.

@pkatseas
Last active March 6, 2017 01:36
Show Gist options
  • Save pkatseas/69af51bd421c05baad7ecde98f22b7f8 to your computer and use it in GitHub Desktop.
Save pkatseas/69af51bd421c05baad7ecde98f22b7f8 to your computer and use it in GitHub Desktop.
private void handleUnsuccessfulScheduling(
Cluster cluster,
TopologyDetails topologyDetails,
String message
) throws Exception {
// This is the prefix of the message displayed on Storm's UI for any unsuccessful scheduling
String unsuccessfulSchedulingMessage = "SCHEDULING FAILED: ";
cluster.setStatus(topologyDetails.getId(), unsuccessfulSchedulingMessage + message);
throw new Exception(message);
}
private Set<WorkerSlot> getAliveSlots(Cluster cluster, TopologyDetails topologyDetails) {
// Get the existing assignment of the current topology as it's live in the cluster
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId());
// Return alive slots, if any, otherwise an empty set
if (existingAssignment != null) {
return existingAssignment.getSlots();
} else {
return new HashSet<WorkerSlot>();
}
}
private List<WorkerSlot> getSlotsToAssign(
Cluster cluster,
TopologyDetails topologyDetails,
List<SupervisorDetails> supervisors,
List<String> componentsForTag,
String tag
) throws Exception {
String topologyID = topologyDetails.getId();
// Collect the available slots of each of the supervisors we were given in a list
List<WorkerSlot> availableSlots = new ArrayList<WorkerSlot>();
for (SupervisorDetails supervisor : supervisors) {
availableSlots.addAll(cluster.getAvailableSlots(supervisor));
}
if (availableSlots.isEmpty()) {
// This is bad, we have supervisors and executors to assign, but no available slots!
String message = String.format(
"No slots are available for assigning executors for tag %s (components: %s)",
tag, componentsForTag
);
handleUnsuccessfulScheduling(cluster, topologyDetails, message);
}
Set<WorkerSlot> aliveSlots = getAliveSlots(cluster, topologyDetails);
int numAvailableSlots = availableSlots.size();
int numSlotsNeeded = topologyDetails.getNumWorkers() - aliveSlots.size();
// We want to check that we have enough available slots
// based on the topology's number of workers and already assigned slots.
if (numAvailableSlots < numSlotsNeeded) {
// This is bad, we don't have enough slots to assign to!
String message = String.format(
"Not enough slots available for assigning executors for tag %s (components: %s). "
+ "Need %s slots to schedule but found only %s",
tag, componentsForTag, numSlotsNeeded, numAvailableSlots
);
handleUnsuccessfulScheduling(cluster, topologyDetails, message);
}
// Now we can use only as many slots as are required.
return availableSlots.subList(0, numSlotsNeeded);
}
private Map<WorkerSlot, ArrayList<ExecutorDetails>> getExecutorsBySlot(
List<WorkerSlot> slots,
List<ExecutorDetails> executors
) {
Map<WorkerSlot, ArrayList<ExecutorDetails>> assignments = new HashMap<WorkerSlot, ArrayList<ExecutorDetails>>();
int numberOfSlots = slots.size();
// We want to split the executors as evenly as possible, across each slot available,
// so we assign each executor to a slot via round robin
for (int i = 0; i < executors.size(); i++) {
WorkerSlot slotToAssign = slots.get(i % numberOfSlots);
ExecutorDetails executorToAssign = executors.get(i);
if (assignments.containsKey(slotToAssign)) {
// If we've already seen this slot, then just add the executor to the existing ArrayList.
assignments.get(slotToAssign).add(executorToAssign);
} else {
// If this slot is new, then create a new ArrayList,
// add the current executor, and populate the map's slot entry with it.
ArrayList<ExecutorDetails> newExecutorList = new ArrayList<ExecutorDetails>();
newExecutorList.add(executorToAssign);
assignments.put(slotToAssign, newExecutorList);
}
}
return assignments;
}
private void populateComponentExecutorsToSlotsMap(
Map<WorkerSlot, ArrayList<ExecutorDetails>> componentExecutorsToSlotsMap,
Cluster cluster,
TopologyDetails topologyDetails,
List<SupervisorDetails> supervisors,
List<ExecutorDetails> executors,
List<String> componentsForTag,
String tag
) throws Exception {
String topologyID = topologyDetails.getId();
if (supervisors == null) {
// This is bad, we don't have any supervisors but have executors to assign!
String message = String.format(
"No supervisors given for executors %s of topology %s and tag %s (components: %s)",
executors, topologyID, tag, componentsForTag
);
handleUnsuccessfulScheduling(cluster, topologyDetails, message);
}
List<WorkerSlot> slotsToAssign = getSlotsToAssign(
cluster, topologyDetails, supervisors, componentsForTag, tag
);
// Divide the executors evenly across the slots and get a map of slot to executors
Map<WorkerSlot, ArrayList<ExecutorDetails>> executorsBySlot = getExecutorsBySlot(
slotsToAssign, executors
);
for (Entry<WorkerSlot, ArrayList<ExecutorDetails>> entry : executorsBySlot.entrySet()) {
WorkerSlot slotToAssign = entry.getKey();
ArrayList<ExecutorDetails> executorsToAssign = entry.getValue();
// Assign the topology's executors to slots in the cluster's supervisors
componentExecutorsToSlotsMap.put(slotToAssign, executorsToAssign);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment