Skip to content

Instantly share code, notes, and snippets.

@pkatseas
Last active March 6, 2017 00:35
Show Gist options
  • Save pkatseas/d53feb5517cd0e728d859f42a7f735ae to your computer and use it in GitHub Desktop.
Save pkatseas/d53feb5517cd0e728d859f42a7f735ae to your computer and use it in GitHub Desktop.
private Set<ExecutorDetails> getAliveExecutors(Cluster cluster, TopologyDetails topologyDetails) {
// Get the existing assignment of the current topology as it's live in the cluster
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId());
// Return alive executors, if any, otherwise an empty set
if (existingAssignment != null) {
return existingAssignment.getExecutors();
} else {
return new HashSet<ExecutorDetails>();
}
}
private Map<String, ArrayList<ExecutorDetails>> getExecutorsToBeScheduledByTag(
Cluster cluster,
TopologyDetails topologyDetails,
Map<String, ArrayList<String>> componentsPerTag
) {
// Initialise the return value
Map<String, ArrayList<ExecutorDetails>> executorsByTag = new HashMap<String, ArrayList<ExecutorDetails>>();
// Find which topology executors are already assigned
Set<ExecutorDetails> aliveExecutors = getAliveExecutors(cluster, topologyDetails);
// Get a map of component to executors for the topology that need scheduling
Map<String, List<ExecutorDetails>> executorsByComponent = cluster.getNeedsSchedulingComponentToExecutors(
topologyDetails
);
// Loop through componentsPerTag to populate the map
for (Entry<String, ArrayList<String>> entry : componentsPerTag.entrySet()) {
String tag = entry.getKey();
ArrayList<String> componentIDs = entry.getValue();
// Initialise the map entry for the current tag
ArrayList<ExecutorDetails> executorsForTag = new ArrayList<ExecutorDetails>();
// Loop through this tag's component IDs
for (String componentID : componentIDs) {
// Fetch the executors for the current component ID
List<ExecutorDetails> executorsForComponent = executorsByComponent.get(componentID);
if (executorsForComponent == null) {
continue;
}
// Convert the list of executors to a set
Set<ExecutorDetails> executorsToAssignForComponent = new HashSet<ExecutorDetails>(
executorsForComponent
);
// Remove already assigned executors from the set of executors to assign, if any
executorsToAssignForComponent.removeAll(aliveExecutors);
// Add the component's waiting to be assigned executors to the current tag executors
executorsForTag.addAll(executorsToAssignForComponent);
}
// Populate the map of executors by tag after looping through all of the tag's components,
// if there are any executors to be scheduled
if (!executorsForTag.isEmpty()) {
executorsByTag.put(tag, executorsForTag);
}
}
return executorsByTag;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment