Last active
March 6, 2017 01:36
-
-
Save pkatseas/69af51bd421c05baad7ecde98f22b7f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void handleUnsuccessfulScheduling( | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
String message | |
) throws Exception { | |
// This is the prefix of the message displayed on Storm's UI for any unsuccessful scheduling | |
String unsuccessfulSchedulingMessage = "SCHEDULING FAILED: "; | |
cluster.setStatus(topologyDetails.getId(), unsuccessfulSchedulingMessage + message); | |
throw new Exception(message); | |
} | |
private Set<WorkerSlot> getAliveSlots(Cluster cluster, TopologyDetails topologyDetails) { | |
// Get the existing assignment of the current topology as it's live in the cluster | |
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId()); | |
// Return alive slots, if any, otherwise an empty set | |
if (existingAssignment != null) { | |
return existingAssignment.getSlots(); | |
} else { | |
return new HashSet<WorkerSlot>(); | |
} | |
} | |
private List<WorkerSlot> getSlotsToAssign( | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
List<SupervisorDetails> supervisors, | |
List<String> componentsForTag, | |
String tag | |
) throws Exception { | |
String topologyID = topologyDetails.getId(); | |
// Collect the available slots of each of the supervisors we were given in a list | |
List<WorkerSlot> availableSlots = new ArrayList<WorkerSlot>(); | |
for (SupervisorDetails supervisor : supervisors) { | |
availableSlots.addAll(cluster.getAvailableSlots(supervisor)); | |
} | |
if (availableSlots.isEmpty()) { | |
// This is bad, we have supervisors and executors to assign, but no available slots! | |
String message = String.format( | |
"No slots are available for assigning executors for tag %s (components: %s)", | |
tag, componentsForTag | |
); | |
handleUnsuccessfulScheduling(cluster, topologyDetails, message); | |
} | |
Set<WorkerSlot> aliveSlots = getAliveSlots(cluster, topologyDetails); | |
int numAvailableSlots = availableSlots.size(); | |
int numSlotsNeeded = topologyDetails.getNumWorkers() - aliveSlots.size(); | |
// We want to check that we have enough available slots | |
// based on the topology's number of workers and already assigned slots. | |
if (numAvailableSlots < numSlotsNeeded) { | |
// This is bad, we don't have enough slots to assign to! | |
String message = String.format( | |
"Not enough slots available for assigning executors for tag %s (components: %s). " | |
+ "Need %s slots to schedule but found only %s", | |
tag, componentsForTag, numSlotsNeeded, numAvailableSlots | |
); | |
handleUnsuccessfulScheduling(cluster, topologyDetails, message); | |
} | |
// Now we can use only as many slots as are required. | |
return availableSlots.subList(0, numSlotsNeeded); | |
} | |
private Map<WorkerSlot, ArrayList<ExecutorDetails>> getExecutorsBySlot( | |
List<WorkerSlot> slots, | |
List<ExecutorDetails> executors | |
) { | |
Map<WorkerSlot, ArrayList<ExecutorDetails>> assignments = new HashMap<WorkerSlot, ArrayList<ExecutorDetails>>(); | |
int numberOfSlots = slots.size(); | |
// We want to split the executors as evenly as possible, across each slot available, | |
// so we assign each executor to a slot via round robin | |
for (int i = 0; i < executors.size(); i++) { | |
WorkerSlot slotToAssign = slots.get(i % numberOfSlots); | |
ExecutorDetails executorToAssign = executors.get(i); | |
if (assignments.containsKey(slotToAssign)) { | |
// If we've already seen this slot, then just add the executor to the existing ArrayList. | |
assignments.get(slotToAssign).add(executorToAssign); | |
} else { | |
// If this slot is new, then create a new ArrayList, | |
// add the current executor, and populate the map's slot entry with it. | |
ArrayList<ExecutorDetails> newExecutorList = new ArrayList<ExecutorDetails>(); | |
newExecutorList.add(executorToAssign); | |
assignments.put(slotToAssign, newExecutorList); | |
} | |
} | |
return assignments; | |
} | |
private void populateComponentExecutorsToSlotsMap( | |
Map<WorkerSlot, ArrayList<ExecutorDetails>> componentExecutorsToSlotsMap, | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
List<SupervisorDetails> supervisors, | |
List<ExecutorDetails> executors, | |
List<String> componentsForTag, | |
String tag | |
) throws Exception { | |
String topologyID = topologyDetails.getId(); | |
if (supervisors == null) { | |
// This is bad, we don't have any supervisors but have executors to assign! | |
String message = String.format( | |
"No supervisors given for executors %s of topology %s and tag %s (components: %s)", | |
executors, topologyID, tag, componentsForTag | |
); | |
handleUnsuccessfulScheduling(cluster, topologyDetails, message); | |
} | |
List<WorkerSlot> slotsToAssign = getSlotsToAssign( | |
cluster, topologyDetails, supervisors, componentsForTag, tag | |
); | |
// Divide the executors evenly across the slots and get a map of slot to executors | |
Map<WorkerSlot, ArrayList<ExecutorDetails>> executorsBySlot = getExecutorsBySlot( | |
slotsToAssign, executors | |
); | |
for (Entry<WorkerSlot, ArrayList<ExecutorDetails>> entry : executorsBySlot.entrySet()) { | |
WorkerSlot slotToAssign = entry.getKey(); | |
ArrayList<ExecutorDetails> executorsToAssign = entry.getValue(); | |
// Assign the topology's executors to slots in the cluster's supervisors | |
componentExecutorsToSlotsMap.put(slotToAssign, executorsToAssign); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment