Last active
March 6, 2017 00:35
-
-
Save pkatseas/d53feb5517cd0e728d859f42a7f735ae to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private Set<ExecutorDetails> getAliveExecutors(Cluster cluster, TopologyDetails topologyDetails) { | |
// Get the existing assignment of the current topology as it's live in the cluster | |
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId()); | |
// Return alive executors, if any, otherwise an empty set | |
if (existingAssignment != null) { | |
return existingAssignment.getExecutors(); | |
} else { | |
return new HashSet<ExecutorDetails>(); | |
} | |
} | |
private Map<String, ArrayList<ExecutorDetails>> getExecutorsToBeScheduledByTag( | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
Map<String, ArrayList<String>> componentsPerTag | |
) { | |
// Initialise the return value | |
Map<String, ArrayList<ExecutorDetails>> executorsByTag = new HashMap<String, ArrayList<ExecutorDetails>>(); | |
// Find which topology executors are already assigned | |
Set<ExecutorDetails> aliveExecutors = getAliveExecutors(cluster, topologyDetails); | |
// Get a map of component to executors for the topology that need scheduling | |
Map<String, List<ExecutorDetails>> executorsByComponent = cluster.getNeedsSchedulingComponentToExecutors( | |
topologyDetails | |
); | |
// Loop through componentsPerTag to populate the map | |
for (Entry<String, ArrayList<String>> entry : componentsPerTag.entrySet()) { | |
String tag = entry.getKey(); | |
ArrayList<String> componentIDs = entry.getValue(); | |
// Initialise the map entry for the current tag | |
ArrayList<ExecutorDetails> executorsForTag = new ArrayList<ExecutorDetails>(); | |
// Loop through this tag's component IDs | |
for (String componentID : componentIDs) { | |
// Fetch the executors for the current component ID | |
List<ExecutorDetails> executorsForComponent = executorsByComponent.get(componentID); | |
if (executorsForComponent == null) { | |
continue; | |
} | |
// Convert the list of executors to a set | |
Set<ExecutorDetails> executorsToAssignForComponent = new HashSet<ExecutorDetails>( | |
executorsForComponent | |
); | |
// Remove already assigned executors from the set of executors to assign, if any | |
executorsToAssignForComponent.removeAll(aliveExecutors); | |
// Add the component's waiting to be assigned executors to the current tag executors | |
executorsForTag.addAll(executorsToAssignForComponent); | |
} | |
// Populate the map of executors by tag after looping through all of the tag's components, | |
// if there are any executors to be scheduled | |
if (!executorsForTag.isEmpty()) { | |
executorsByTag.put(tag, executorsForTag); | |
} | |
} | |
return executorsByTag; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment