Created May 4, 2017 01:15
Планировщик задач для Apache Storm с поддержкой тегов
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.scheduler.*;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
* @author Никита
public class TagAwareScheduler implements IScheduler {
private final String untaggedTag = "untagged";
private static final Logger LOG = LoggerFactory.getLogger(TagAwareScheduler.class);
private Map<String, ArrayList<SupervisorDetails>> getSupervisorsByTag(
Collection<SupervisorDetails> supervisorDetails
) {
// A map of tag -> supervisors, to help with scheduling of components with specific tags
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = new HashMap<String, ArrayList<SupervisorDetails>>();
for (SupervisorDetails supervisor : supervisorDetails) {
Map<String, String> metadata = (Map<String, String>) supervisor.getSchedulerMeta();
String tags;
if (metadata == null) {
tags = untaggedTag;
} else {
tags = metadata.get("tags");
if (tags == null) {
tags = untaggedTag;
// If the supervisor has tags attached to it, handle it by populating the supervisorsByTag map.
// Loop through each of the tags to handle individually
for (String tag : tags.split(",")) {
tag = tag.trim();
if (supervisorsByTag.containsKey(tag)) {
// If we've already seen this tag, then just add the supervisor to the existing ArrayList.
} else {
// If this tag is new, then create a new ArrayList<SupervisorDetails>,
// add the current supervisor, and populate the map's tag entry with it.
ArrayList<SupervisorDetails> newSupervisorList = new ArrayList<SupervisorDetails>();
supervisorsByTag.put(tag, newSupervisorList);
return supervisorsByTag;
private <T> void populateComponentsByTag(
Map<String, ArrayList<String>> componentsByTag,
Map<String, T> components
) {
// Type T can be either Bolt or SpoutSpec, so that this logic can be reused for both component types
JSONParser parser = new JSONParser();
for (Map.Entry<String, T> componentEntry : components.entrySet()) {
JSONObject conf = null;
String componentID = componentEntry.getKey();
T component = componentEntry.getValue();
try {
// Get the component's conf irrespective of its type (via java reflection)
Method getCommonComponentMethod = component.getClass().getMethod("get_common");
ComponentCommon commonComponent = (ComponentCommon) getCommonComponentMethod.invoke(component);
conf = (JSONObject) parser.parse(commonComponent.get_json_conf());
} catch (ParseException | NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) {
String tags;
// If there's no config, use a fake tag to group all untagged components
if (conf == null) {
tags = untaggedTag;
} else {
tags = (String) conf.get("tags");
// If there are no tags, use a fake tag to group all untagged components
if (tags == null) {
tags = untaggedTag;
// If the component has tags attached to it, handle it by populating the componentsByTag map.
// Loop through each of the tags to handle individually
for (String tag : tags.split(",")) {
tag = tag.trim();
if (componentsByTag.containsKey(tag)) {
// If we've already seen this tag, then just add the component to the existing ArrayList.
} else {
// If this tag is new, then create a new ArrayList,
// add the current component, and populate the map's tag entry with it.
ArrayList<String> newComponentList = new ArrayList<String>();
componentsByTag.put(tag, newComponentList);
private void populateComponentsByTagWithStormInternals(
Map<String, ArrayList<String>> componentsByTag,
Set<String> components
) {
// Storm uses some internal components, like __acker.
// These components are topology-agnostic and are therefore not accessible through a StormTopology object.
// While a bit hacky, this is a way to make sure that we schedule those components along with our topology ones:
// we treat these internal components as regular untagged components and add them to the componentsByTag map.
for (String componentID : components) {
if (componentID.startsWith("__")) {
if (componentsByTag.containsKey(untaggedTag)) {
// If we've already seen untagged components, then just add the component to the existing ArrayList.
} else {
// If this is the first untagged component we see, then create a new ArrayList,
// add the current component, and populate the map's untagged entry with it.
ArrayList<String> newComponentList = new ArrayList<String>();
componentsByTag.put(untaggedTag, newComponentList);
private Set<ExecutorDetails> getAliveExecutors(Cluster cluster, TopologyDetails topologyDetails) {
// Get the existing assignment of the current topology as it's live in the cluster
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId());
// Return alive executors, if any, otherwise an empty set
if (existingAssignment != null) {
return existingAssignment.getExecutors();
} else {
return new HashSet<ExecutorDetails>();
private Map<String, ArrayList<ExecutorDetails>> getExecutorsToBeScheduledByTag(
Cluster cluster,
TopologyDetails topologyDetails,
Map<String, ArrayList<String>> componentsPerTag
) {
// Initialise the return value
Map<String, ArrayList<ExecutorDetails>> executorsByTag = new HashMap<String, ArrayList<ExecutorDetails>>();
// Find which topology executors are already assigned
Set<ExecutorDetails> aliveExecutors = getAliveExecutors(cluster, topologyDetails);
// Get a map of component to executors for the topology that need scheduling
Map<String, List<ExecutorDetails>> executorsByComponent = cluster.getNeedsSchedulingComponentToExecutors(
);"executors to be scheduled = " + executorsByComponent.toString());"components per tag " + componentsPerTag.toString());
// Loop through componentsPerTag to populate the map
for (Map.Entry<String, ArrayList<String>> entry : componentsPerTag.entrySet()) {
String tag = entry.getKey();"getting executors for tag = " + tag);
ArrayList<String> componentIDs = entry.getValue();
// Initialise the map entry for the current tag
ArrayList<ExecutorDetails> executorsForTag = new ArrayList<ExecutorDetails>();
// Loop through this tag's component IDs
for (String componentID : componentIDs) {
// Fetch the executors for the current component ID
List<ExecutorDetails> executorsForComponent = executorsByComponent.get(componentID);
if (executorsForComponent == null) {
// Convert the list of executors to a set
Set<ExecutorDetails> executorsToAssignForComponent = new HashSet<ExecutorDetails>(
// Remove already assigned executors from the set of executors to assign, if any
// Add the component's waiting to be assigned executors to the current tag executors
// Populate the map of executors by tag after looping through all of the tag's components,
// if there are any executors to be scheduled
if (!executorsForTag.isEmpty()) {
executorsByTag.put(tag, executorsForTag);
}"executors by tag " + executorsByTag.toString());
return executorsByTag;
private void handleUnsuccessfulScheduling(
Cluster cluster,
TopologyDetails topologyDetails,
String message
) throws Exception {
// This is the prefix of the message displayed on Storm's UI for any unsuccessful scheduling
String unsuccessfulSchedulingMessage = "SCHEDULING FAILED: ";
cluster.setStatus(topologyDetails.getId(), unsuccessfulSchedulingMessage + message);
throw new Exception(message);
private Set<WorkerSlot> getAliveSlots(Cluster cluster, TopologyDetails topologyDetails) {
// Get the existing assignment of the current topology as it's live in the cluster
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId());
// Return alive slots, if any, otherwise an empty set
if (existingAssignment != null) {
return existingAssignment.getSlots();
} else {
return new HashSet<WorkerSlot>();
private List<WorkerSlot> getSlotsToAssign(
Cluster cluster,
TopologyDetails topologyDetails,
List<SupervisorDetails> supervisors,
List<String> componentsForTag,
String tag
) throws Exception {
String topologyID = topologyDetails.getId();
// Collect the available slots of each of the supervisors we were given in a list
List<WorkerSlot> availableSlots = new ArrayList<WorkerSlot>();
for (SupervisorDetails supervisor : supervisors) {
// availableSlots.addAll(cluster.getAvailableSlots(supervisor));
// available slots - слоты, которые в принципе можно использовать (по дефолту 4), но для каждой топологии
// задается свое значение через setNumWorkers()
availableSlots.addAll(cluster.getAvailableSlots(supervisor));"number of available slots for supervisor " + supervisor.getHost() + " = " + cluster.getAvailableSlots(supervisor).size());
if (availableSlots.isEmpty()) {
// This is bad, we have supervisors and executors to assign, but no available slots!
String message = String.format(
"No slots are available for assigning executors for tag %s (components: %s)",
tag, componentsForTag
handleUnsuccessfulScheduling(cluster, topologyDetails, message);
return availableSlots;
// Set<WorkerSlot> aliveSlots = getAliveSlots(cluster, topologyDetails);
//"number of alive slots = " + aliveSlots.size());
//"number of workers = " + topologyDetails.getNumWorkers());
// int numAvailableSlots = availableSlots.size();
// int numSlotsNeeded = topologyDetails.getNumWorkers() - aliveSlots.size();
// // We want to check that we have enough available slots
// // based on the topology's number of workers and already assigned slots.
// if (numAvailableSlots < numSlotsNeeded) {
// // This is bad, we don't have enough slots to assign to!
// String message = String.format(
// "Not enough slots available for assigning executors for tag %s (components: %s). "
// + "Need %s slots to schedule but found only %s",
// tag, componentsForTag, numSlotsNeeded, numAvailableSlots
// );
// handleUnsuccessfulScheduling(cluster, topologyDetails, message);
// }
// // Now we can use only as many slots as are required.
// return availableSlots.subList(0, numSlotsNeeded);
private Map<WorkerSlot, ArrayList<ExecutorDetails>> getExecutorsBySlot(
List<WorkerSlot> slots,
List<ExecutorDetails> executors
) {
Map<WorkerSlot, ArrayList<ExecutorDetails>> assignments = new HashMap<WorkerSlot, ArrayList<ExecutorDetails>>();
int numberOfSlots = slots.size();
// We want to split the executors as evenly as possible, across each slot available,
// so we assign each executor to a slot via round robin
for (int i = 0; i < executors.size(); i++) {
WorkerSlot slotToAssign = slots.get(i % numberOfSlots);
ExecutorDetails executorToAssign = executors.get(i);
if (assignments.containsKey(slotToAssign)) {
// If we've already seen this slot, then just add the executor to the existing ArrayList.
} else {
// If this slot is new, then create a new ArrayList,
// add the current executor, and populate the map's slot entry with it.
ArrayList<ExecutorDetails> newExecutorList = new ArrayList<ExecutorDetails>();
assignments.put(slotToAssign, newExecutorList);
return assignments;
private void populateComponentExecutorsToSlotsMap(
Map<WorkerSlot, ArrayList<ExecutorDetails>> componentExecutorsToSlotsMap,
Cluster cluster,
TopologyDetails topologyDetails,
List<SupervisorDetails> supervisors,
List<ExecutorDetails> executors,
List<String> componentsForTag,
String tag
) throws Exception {
String topologyID = topologyDetails.getId();
if (supervisors == null) {
// This is bad, we don't have any supervisors but have executors to assign!
String message = String.format(
"No supervisors given for executors %s of topology %s and tag %s (components: %s)",
executors, topologyID, tag, componentsForTag
handleUnsuccessfulScheduling(cluster, topologyDetails, message);
List<WorkerSlot> slotsToAssign = getSlotsToAssign(
cluster, topologyDetails, supervisors, componentsForTag, tag
// Divide the executors evenly across the slots and get a map of slot to executors
Map<WorkerSlot, ArrayList<ExecutorDetails>> executorsBySlot = getExecutorsBySlot(
slotsToAssign, executors
for (Map.Entry<WorkerSlot, ArrayList<ExecutorDetails>> entry : executorsBySlot.entrySet()) {
WorkerSlot slotToAssign = entry.getKey();
ArrayList<ExecutorDetails> executorsToAssign = entry.getValue();
// Assign the topology's executors to slots in the cluster's supervisors
// Здесь перезаписывались уже ассоциированные executors, поэтому пришлось сделать проверку
// То есть если уже был ассоциирован executor со слотом, то добавить новые executors, а не перезаписать
if (!componentExecutorsToSlotsMap.containsKey(slotToAssign))
componentExecutorsToSlotsMap.put(slotToAssign, executorsToAssign);
else componentExecutorsToSlotsMap.get(slotToAssign).addAll(executorsToAssign);
private void tagAwareSchedule(Topologies topologies, Cluster cluster) {
Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values();
// Get the lists of tagged and unreserved supervisors.
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = getSupervisorsByTag(supervisorDetails);
for (TopologyDetails topologyDetails : cluster.needsSchedulingTopologies(topologies)) {
StormTopology stormTopology = topologyDetails.getTopology();
String topologyID = topologyDetails.getId();
// Get components from topology
Map<String, Bolt> bolts = stormTopology.get_bolts();
Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
// Get a map of component to executors
Map<String, List<ExecutorDetails>> executorsByComponent = cluster.getNeedsSchedulingComponentToExecutors(
// Get a map of tag to components
Map<String, ArrayList<String>> componentsByTag = new HashMap<String, ArrayList<String>>();
populateComponentsByTag(componentsByTag, bolts);
populateComponentsByTag(componentsByTag, spouts);
populateComponentsByTagWithStormInternals(componentsByTag, executorsByComponent.keySet());
// Get a map of tag to executors
Map<String, ArrayList<ExecutorDetails>> executorsToBeScheduledByTag = getExecutorsToBeScheduledByTag(
cluster, topologyDetails, componentsByTag
// Initialise a map of slot -> executors
Map<WorkerSlot, ArrayList<ExecutorDetails>> componentExecutorsToSlotsMap = (
new HashMap<>()
// Time to match everything up!
for (Map.Entry<String, ArrayList<ExecutorDetails>> entry : executorsToBeScheduledByTag.entrySet()) {
String tag = entry.getKey();"--------------------------");"scheduling tag = " + tag);"executors for tag = " + tag + " : " + entry.getValue().toString());"components for tag = " + tag + " : " + componentsByTag.get(tag).toString());"supervisors for tag = " + tag + " : " + supervisorsByTag.get(tag).toString());
ArrayList<ExecutorDetails> executorsForTag = entry.getValue();
ArrayList<SupervisorDetails> supervisorsForTag = supervisorsByTag.get(tag);
ArrayList<String> componentsForTag = componentsByTag.get(tag);
try {
cluster, topologyDetails, supervisorsForTag, executorsForTag, componentsForTag, tag
);"found slots for tag = " + tag + " : " + componentExecutorsToSlotsMap.toString());
} catch (Exception e) {
// Do the actual assigning
// We do this as a separate step to only perform any assigning if there have been no issues so far.
// That's aimed at avoiding partial scheduling from occurring, with some components already scheduled
// and alive, while others cannot be scheduled.
for (Map.Entry<WorkerSlot, ArrayList<ExecutorDetails>> entry : componentExecutorsToSlotsMap.entrySet()) {
WorkerSlot slotToAssign = entry.getKey();
ArrayList<ExecutorDetails> executorsToAssign = entry.getValue();
cluster.assign(slotToAssign, topologyID, executorsToAssign);
// If we've reached this far, then scheduling must have been successful
public void prepare(Map conf) {"loading tag aware scheduler");
public void schedule(Topologies topologies, Cluster cluster) {"start scheduling with tag aware scheduler");
tagAwareSchedule(topologies, cluster);
