Skip to content

Instantly share code, notes, and snippets.

@pkatseas
Created March 6, 2017 12:48
Show Gist options
  • Save pkatseas/1c2ebe5ee96d54baeb92045c32eaeacd to your computer and use it in GitHub Desktop.
Save pkatseas/1c2ebe5ee96d54baeb92045c32eaeacd to your computer and use it in GitHub Desktop.
private void tagAwareSchedule(Topologies topologies, Cluster cluster) {
Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values();
// Get the lists of tagged and unreserved supervisors.
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = getSupervisorsByTag(supervisorDetails);
for (TopologyDetails topologyDetails : cluster.needsSchedulingTopologies(topologies)) {
StormTopology stormTopology = topologyDetails.getTopology();
String topologyID = topologyDetails.getId();
// Get components from topology
Map<String, Bolt> bolts = stormTopology.get_bolts();
Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
// Get a map of component to executors
Map<String, List<ExecutorDetails>> executorsByComponent = cluster.getNeedsSchedulingComponentToExecutors(
topologyDetails
);
// Get a map of tag to components
Map<String, ArrayList<String>> componentsByTag = new HashMap<String, ArrayList<String>>();
populateComponentsByTag(componentsByTag, bolts);
populateComponentsByTag(componentsByTag, spouts);
populateComponentsByTagWithStormInternals(componentsByTag, executorsByComponent.keySet());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment