Created
May 4, 2017 01:15
-
-
Save GorshkovNikita/f6987d379f4a5c48e8344f6ec7ded2f0 to your computer and use it in GitHub Desktop.
Планировщик задач для Apache Storm с поддержкой тегов
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.storm.generated.Bolt; | |
import org.apache.storm.generated.ComponentCommon; | |
import org.apache.storm.generated.SpoutSpec; | |
import org.apache.storm.generated.StormTopology; | |
import org.apache.storm.scheduler.*; | |
import org.json.simple.JSONObject; | |
import org.json.simple.parser.JSONParser; | |
import org.json.simple.parser.ParseException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.lang.reflect.InvocationTargetException; | |
import java.lang.reflect.Method; | |
import java.util.*; | |
/** | |
* @author Никита | |
*/ | |
public class TagAwareScheduler implements IScheduler { | |
private final String untaggedTag = "untagged"; | |
private static final Logger LOG = LoggerFactory.getLogger(TagAwareScheduler.class); | |
private Map<String, ArrayList<SupervisorDetails>> getSupervisorsByTag( | |
Collection<SupervisorDetails> supervisorDetails | |
) { | |
// A map of tag -> supervisors, to help with scheduling of components with specific tags | |
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = new HashMap<String, ArrayList<SupervisorDetails>>(); | |
for (SupervisorDetails supervisor : supervisorDetails) { | |
@SuppressWarnings("unchecked") | |
Map<String, String> metadata = (Map<String, String>) supervisor.getSchedulerMeta(); | |
String tags; | |
if (metadata == null) { | |
tags = untaggedTag; | |
} else { | |
tags = metadata.get("tags"); | |
if (tags == null) { | |
tags = untaggedTag; | |
} | |
} | |
// If the supervisor has tags attached to it, handle it by populating the supervisorsByTag map. | |
// Loop through each of the tags to handle individually | |
for (String tag : tags.split(",")) { | |
tag = tag.trim(); | |
if (supervisorsByTag.containsKey(tag)) { | |
// If we've already seen this tag, then just add the supervisor to the existing ArrayList. | |
supervisorsByTag.get(tag).add(supervisor); | |
} else { | |
// If this tag is new, then create a new ArrayList<SupervisorDetails>, | |
// add the current supervisor, and populate the map's tag entry with it. | |
ArrayList<SupervisorDetails> newSupervisorList = new ArrayList<SupervisorDetails>(); | |
newSupervisorList.add(supervisor); | |
supervisorsByTag.put(tag, newSupervisorList); | |
} | |
} | |
} | |
return supervisorsByTag; | |
} | |
private <T> void populateComponentsByTag( | |
Map<String, ArrayList<String>> componentsByTag, | |
Map<String, T> components | |
) { | |
// Type T can be either Bolt or SpoutSpec, so that this logic can be reused for both component types | |
JSONParser parser = new JSONParser(); | |
for (Map.Entry<String, T> componentEntry : components.entrySet()) { | |
JSONObject conf = null; | |
String componentID = componentEntry.getKey(); | |
T component = componentEntry.getValue(); | |
try { | |
// Get the component's conf irrespective of its type (via java reflection) | |
Method getCommonComponentMethod = component.getClass().getMethod("get_common"); | |
ComponentCommon commonComponent = (ComponentCommon) getCommonComponentMethod.invoke(component); | |
conf = (JSONObject) parser.parse(commonComponent.get_json_conf()); | |
} catch (ParseException | NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) { | |
ex.printStackTrace(); | |
} | |
String tags; | |
// If there's no config, use a fake tag to group all untagged components | |
if (conf == null) { | |
tags = untaggedTag; | |
} else { | |
tags = (String) conf.get("tags"); | |
// If there are no tags, use a fake tag to group all untagged components | |
if (tags == null) { | |
tags = untaggedTag; | |
} | |
} | |
// If the component has tags attached to it, handle it by populating the componentsByTag map. | |
// Loop through each of the tags to handle individually | |
for (String tag : tags.split(",")) { | |
tag = tag.trim(); | |
if (componentsByTag.containsKey(tag)) { | |
// If we've already seen this tag, then just add the component to the existing ArrayList. | |
componentsByTag.get(tag).add(componentID); | |
} else { | |
// If this tag is new, then create a new ArrayList, | |
// add the current component, and populate the map's tag entry with it. | |
ArrayList<String> newComponentList = new ArrayList<String>(); | |
newComponentList.add(componentID); | |
componentsByTag.put(tag, newComponentList); | |
} | |
} | |
} | |
} | |
private void populateComponentsByTagWithStormInternals( | |
Map<String, ArrayList<String>> componentsByTag, | |
Set<String> components | |
) { | |
// Storm uses some internal components, like __acker. | |
// These components are topology-agnostic and are therefore not accessible through a StormTopology object. | |
// While a bit hacky, this is a way to make sure that we schedule those components along with our topology ones: | |
// we treat these internal components as regular untagged components and add them to the componentsByTag map. | |
for (String componentID : components) { | |
if (componentID.startsWith("__")) { | |
if (componentsByTag.containsKey(untaggedTag)) { | |
// If we've already seen untagged components, then just add the component to the existing ArrayList. | |
componentsByTag.get(untaggedTag).add(componentID); | |
} else { | |
// If this is the first untagged component we see, then create a new ArrayList, | |
// add the current component, and populate the map's untagged entry with it. | |
ArrayList<String> newComponentList = new ArrayList<String>(); | |
newComponentList.add(componentID); | |
componentsByTag.put(untaggedTag, newComponentList); | |
} | |
} | |
} | |
} | |
private Set<ExecutorDetails> getAliveExecutors(Cluster cluster, TopologyDetails topologyDetails) { | |
// Get the existing assignment of the current topology as it's live in the cluster | |
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId()); | |
// Return alive executors, if any, otherwise an empty set | |
if (existingAssignment != null) { | |
return existingAssignment.getExecutors(); | |
} else { | |
return new HashSet<ExecutorDetails>(); | |
} | |
} | |
private Map<String, ArrayList<ExecutorDetails>> getExecutorsToBeScheduledByTag( | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
Map<String, ArrayList<String>> componentsPerTag | |
) { | |
// Initialise the return value | |
Map<String, ArrayList<ExecutorDetails>> executorsByTag = new HashMap<String, ArrayList<ExecutorDetails>>(); | |
// Find which topology executors are already assigned | |
Set<ExecutorDetails> aliveExecutors = getAliveExecutors(cluster, topologyDetails); | |
// Get a map of component to executors for the topology that need scheduling | |
Map<String, List<ExecutorDetails>> executorsByComponent = cluster.getNeedsSchedulingComponentToExecutors( | |
topologyDetails | |
); | |
LOG.info("executors to be scheduled = " + executorsByComponent.toString()); | |
LOG.info("components per tag " + componentsPerTag.toString()); | |
// Loop through componentsPerTag to populate the map | |
for (Map.Entry<String, ArrayList<String>> entry : componentsPerTag.entrySet()) { | |
String tag = entry.getKey(); | |
LOG.info("getting executors for tag = " + tag); | |
ArrayList<String> componentIDs = entry.getValue(); | |
// Initialise the map entry for the current tag | |
ArrayList<ExecutorDetails> executorsForTag = new ArrayList<ExecutorDetails>(); | |
// Loop through this tag's component IDs | |
for (String componentID : componentIDs) { | |
// Fetch the executors for the current component ID | |
List<ExecutorDetails> executorsForComponent = executorsByComponent.get(componentID); | |
if (executorsForComponent == null) { | |
continue; | |
} | |
// Convert the list of executors to a set | |
Set<ExecutorDetails> executorsToAssignForComponent = new HashSet<ExecutorDetails>( | |
executorsForComponent | |
); | |
// Remove already assigned executors from the set of executors to assign, if any | |
executorsToAssignForComponent.removeAll(aliveExecutors); | |
// Add the component's waiting to be assigned executors to the current tag executors | |
executorsForTag.addAll(executorsToAssignForComponent); | |
} | |
// Populate the map of executors by tag after looping through all of the tag's components, | |
// if there are any executors to be scheduled | |
if (!executorsForTag.isEmpty()) { | |
executorsByTag.put(tag, executorsForTag); | |
} | |
} | |
LOG.info("executors by tag " + executorsByTag.toString()); | |
return executorsByTag; | |
} | |
private void handleUnsuccessfulScheduling( | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
String message | |
) throws Exception { | |
// This is the prefix of the message displayed on Storm's UI for any unsuccessful scheduling | |
String unsuccessfulSchedulingMessage = "SCHEDULING FAILED: "; | |
cluster.setStatus(topologyDetails.getId(), unsuccessfulSchedulingMessage + message); | |
throw new Exception(message); | |
} | |
private Set<WorkerSlot> getAliveSlots(Cluster cluster, TopologyDetails topologyDetails) { | |
// Get the existing assignment of the current topology as it's live in the cluster | |
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId()); | |
// Return alive slots, if any, otherwise an empty set | |
if (existingAssignment != null) { | |
return existingAssignment.getSlots(); | |
} else { | |
return new HashSet<WorkerSlot>(); | |
} | |
} | |
private List<WorkerSlot> getSlotsToAssign( | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
List<SupervisorDetails> supervisors, | |
List<String> componentsForTag, | |
String tag | |
) throws Exception { | |
String topologyID = topologyDetails.getId(); | |
// Collect the available slots of each of the supervisors we were given in a list | |
List<WorkerSlot> availableSlots = new ArrayList<WorkerSlot>(); | |
for (SupervisorDetails supervisor : supervisors) { | |
// availableSlots.addAll(cluster.getAvailableSlots(supervisor)); | |
// available slots - слоты, которые в принципе можно использовать (по дефолту 4), но для каждой топологии | |
// задается свое значение через setNumWorkers() | |
availableSlots.addAll(cluster.getAvailableSlots(supervisor)); | |
LOG.info("number of available slots for supervisor " + supervisor.getHost() + " = " + cluster.getAvailableSlots(supervisor).size()); | |
} | |
if (availableSlots.isEmpty()) { | |
// This is bad, we have supervisors and executors to assign, but no available slots! | |
String message = String.format( | |
"No slots are available for assigning executors for tag %s (components: %s)", | |
tag, componentsForTag | |
); | |
handleUnsuccessfulScheduling(cluster, topologyDetails, message); | |
} | |
return availableSlots; | |
// Set<WorkerSlot> aliveSlots = getAliveSlots(cluster, topologyDetails); | |
// | |
// LOG.info("number of alive slots = " + aliveSlots.size()); | |
// LOG.info("number of workers = " + topologyDetails.getNumWorkers()); | |
// int numAvailableSlots = availableSlots.size(); | |
// int numSlotsNeeded = topologyDetails.getNumWorkers() - aliveSlots.size(); | |
// | |
// // We want to check that we have enough available slots | |
// // based on the topology's number of workers and already assigned slots. | |
// if (numAvailableSlots < numSlotsNeeded) { | |
// // This is bad, we don't have enough slots to assign to! | |
// String message = String.format( | |
// "Not enough slots available for assigning executors for tag %s (components: %s). " | |
// + "Need %s slots to schedule but found only %s", | |
// tag, componentsForTag, numSlotsNeeded, numAvailableSlots | |
// ); | |
// handleUnsuccessfulScheduling(cluster, topologyDetails, message); | |
// } | |
// | |
// // Now we can use only as many slots as are required. | |
// return availableSlots.subList(0, numSlotsNeeded); | |
} | |
private Map<WorkerSlot, ArrayList<ExecutorDetails>> getExecutorsBySlot( | |
List<WorkerSlot> slots, | |
List<ExecutorDetails> executors | |
) { | |
Map<WorkerSlot, ArrayList<ExecutorDetails>> assignments = new HashMap<WorkerSlot, ArrayList<ExecutorDetails>>(); | |
int numberOfSlots = slots.size(); | |
// We want to split the executors as evenly as possible, across each slot available, | |
// so we assign each executor to a slot via round robin | |
for (int i = 0; i < executors.size(); i++) { | |
WorkerSlot slotToAssign = slots.get(i % numberOfSlots); | |
ExecutorDetails executorToAssign = executors.get(i); | |
if (assignments.containsKey(slotToAssign)) { | |
// If we've already seen this slot, then just add the executor to the existing ArrayList. | |
assignments.get(slotToAssign).add(executorToAssign); | |
} else { | |
// If this slot is new, then create a new ArrayList, | |
// add the current executor, and populate the map's slot entry with it. | |
ArrayList<ExecutorDetails> newExecutorList = new ArrayList<ExecutorDetails>(); | |
newExecutorList.add(executorToAssign); | |
assignments.put(slotToAssign, newExecutorList); | |
} | |
} | |
return assignments; | |
} | |
private void populateComponentExecutorsToSlotsMap( | |
Map<WorkerSlot, ArrayList<ExecutorDetails>> componentExecutorsToSlotsMap, | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
List<SupervisorDetails> supervisors, | |
List<ExecutorDetails> executors, | |
List<String> componentsForTag, | |
String tag | |
) throws Exception { | |
String topologyID = topologyDetails.getId(); | |
if (supervisors == null) { | |
// This is bad, we don't have any supervisors but have executors to assign! | |
String message = String.format( | |
"No supervisors given for executors %s of topology %s and tag %s (components: %s)", | |
executors, topologyID, tag, componentsForTag | |
); | |
handleUnsuccessfulScheduling(cluster, topologyDetails, message); | |
} | |
List<WorkerSlot> slotsToAssign = getSlotsToAssign( | |
cluster, topologyDetails, supervisors, componentsForTag, tag | |
); | |
// Divide the executors evenly across the slots and get a map of slot to executors | |
Map<WorkerSlot, ArrayList<ExecutorDetails>> executorsBySlot = getExecutorsBySlot( | |
slotsToAssign, executors | |
); | |
for (Map.Entry<WorkerSlot, ArrayList<ExecutorDetails>> entry : executorsBySlot.entrySet()) { | |
WorkerSlot slotToAssign = entry.getKey(); | |
ArrayList<ExecutorDetails> executorsToAssign = entry.getValue(); | |
// Assign the topology's executors to slots in the cluster's supervisors | |
// Здесь перезаписывались уже ассоциированные executors, поэтому пришлось сделать проверку | |
// То есть если уже был ассоциирован executor со слотом, то добавить новые executors, а не перезаписать | |
if (!componentExecutorsToSlotsMap.containsKey(slotToAssign)) | |
componentExecutorsToSlotsMap.put(slotToAssign, executorsToAssign); | |
else componentExecutorsToSlotsMap.get(slotToAssign).addAll(executorsToAssign); | |
} | |
} | |
private void tagAwareSchedule(Topologies topologies, Cluster cluster) { | |
Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values(); | |
// Get the lists of tagged and unreserved supervisors. | |
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = getSupervisorsByTag(supervisorDetails); | |
for (TopologyDetails topologyDetails : cluster.needsSchedulingTopologies(topologies)) { | |
StormTopology stormTopology = topologyDetails.getTopology(); | |
String topologyID = topologyDetails.getId(); | |
// Get components from topology | |
Map<String, Bolt> bolts = stormTopology.get_bolts(); | |
Map<String, SpoutSpec> spouts = stormTopology.get_spouts(); | |
// Get a map of component to executors | |
Map<String, List<ExecutorDetails>> executorsByComponent = cluster.getNeedsSchedulingComponentToExecutors( | |
topologyDetails | |
); | |
// Get a map of tag to components | |
Map<String, ArrayList<String>> componentsByTag = new HashMap<String, ArrayList<String>>(); | |
populateComponentsByTag(componentsByTag, bolts); | |
populateComponentsByTag(componentsByTag, spouts); | |
populateComponentsByTagWithStormInternals(componentsByTag, executorsByComponent.keySet()); | |
// Get a map of tag to executors | |
Map<String, ArrayList<ExecutorDetails>> executorsToBeScheduledByTag = getExecutorsToBeScheduledByTag( | |
cluster, topologyDetails, componentsByTag | |
); | |
// Initialise a map of slot -> executors | |
Map<WorkerSlot, ArrayList<ExecutorDetails>> componentExecutorsToSlotsMap = ( | |
new HashMap<>() | |
); | |
// Time to match everything up! | |
for (Map.Entry<String, ArrayList<ExecutorDetails>> entry : executorsToBeScheduledByTag.entrySet()) { | |
String tag = entry.getKey(); | |
LOG.info("--------------------------"); | |
LOG.info("scheduling tag = " + tag); | |
LOG.info("executors for tag = " + tag + " : " + entry.getValue().toString()); | |
LOG.info("components for tag = " + tag + " : " + componentsByTag.get(tag).toString()); | |
LOG.info("supervisors for tag = " + tag + " : " + supervisorsByTag.get(tag).toString()); | |
ArrayList<ExecutorDetails> executorsForTag = entry.getValue(); | |
ArrayList<SupervisorDetails> supervisorsForTag = supervisorsByTag.get(tag); | |
ArrayList<String> componentsForTag = componentsByTag.get(tag); | |
try { | |
populateComponentExecutorsToSlotsMap( | |
componentExecutorsToSlotsMap, | |
cluster, topologyDetails, supervisorsForTag, executorsForTag, componentsForTag, tag | |
); | |
LOG.info("found slots for tag = " + tag + " : " + componentExecutorsToSlotsMap.toString()); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
return; | |
} | |
} | |
// Do the actual assigning | |
// We do this as a separate step to only perform any assigning if there have been no issues so far. | |
// That's aimed at avoiding partial scheduling from occurring, with some components already scheduled | |
// and alive, while others cannot be scheduled. | |
for (Map.Entry<WorkerSlot, ArrayList<ExecutorDetails>> entry : componentExecutorsToSlotsMap.entrySet()) { | |
WorkerSlot slotToAssign = entry.getKey(); | |
ArrayList<ExecutorDetails> executorsToAssign = entry.getValue(); | |
cluster.assign(slotToAssign, topologyID, executorsToAssign); | |
} | |
// If we've reached this far, then scheduling must have been successful | |
cluster.setStatus(topologyID, "SCHEDULING SUCCESSFUL"); | |
LOG.info("SCHEDULING SUCCESSFUL"); | |
} | |
} | |
@Override | |
@SuppressWarnings("rawtypes") | |
public void prepare(Map conf) { | |
LOG.info("loading tag aware scheduler"); | |
} | |
@Override | |
public void schedule(Topologies topologies, Cluster cluster) { | |
LOG.info("start scheduling with tag aware scheduler"); | |
tagAwareSchedule(topologies, cluster); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment