Skip to content

Instantly share code, notes, and snippets.

private void tagAwareSchedule(Topologies topologies, Cluster cluster) {
Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values();
// Get the lists of tagged and unreserved supervisors.
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = getSupervisorsByTag(supervisorDetails);
for (TopologyDetails topologyDetails : cluster.needsSchedulingTopologies(topologies)) {
StormTopology stormTopology = topologyDetails.getTopology();
String topologyID = topologyDetails.getId();
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<url>http://maven.apache.org</url>
<groupId>com.edited</groupId>
<artifactId>tagawarescheduler</artifactId>
private void handleUnsuccessfulScheduling(
Cluster cluster,
TopologyDetails topologyDetails,
String message
) throws Exception {
// This is the prefix of the message displayed on Storm's UI for any unsuccessful scheduling
String unsuccessfulSchedulingMessage = "SCHEDULING FAILED: ";
cluster.setStatus(topologyDetails.getId(), unsuccessfulSchedulingMessage + message);
throw new Exception(message);
private Set<ExecutorDetails> getAliveExecutors(Cluster cluster, TopologyDetails topologyDetails) {
// Get the existing assignment of the current topology as it's live in the cluster
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId());
// Return alive executors, if any, otherwise an empty set
if (existingAssignment != null) {
return existingAssignment.getExecutors();
} else {
return new HashSet<ExecutorDetails>();
}
package com.edited.tagawarescheduler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
private void populateComponentsByTagWithStormInternals(
Map<String, ArrayList<String>> componentsByTag,
Set<String> components
) {
// Storm uses some internal components, like __acker.
// These components are topology-agnostic and are therefore not accessible through a StormTopology object.
// While a bit hacky, this is a way to make sure that we schedule those components along with our topology ones:
// we treat these internal components as regular untagged components and add them to the componentsByTag map.
for (String componentID : components) {
private <T> void populateComponentsByTag(
Map<String, ArrayList<String>> componentsByTag,
Map<String, T> components
) {
// Type T can be either Bolt or SpoutSpec, so that this logic can be reused for both component types
JSONParser parser = new JSONParser();
for (Entry<String, T> componentEntry : components.entrySet()) {
JSONObject conf = null;
private Map<String, ArrayList<SupervisorDetails>> getSupervisorsByTag(
Collection<SupervisorDetails> supervisorDetails
) {
// A map of tag -> supervisors, to help with scheduling of components with specific tags
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = new HashMap<String, ArrayList<SupervisorDetails>>();
for (SupervisorDetails supervisor : supervisorDetails) {
@SuppressWarnings("unchecked")
Map<String, String> metadata = (Map<String, String>) supervisor.getSchedulerMeta();

Keybase proof

I hereby claim:

  • I am pkatseas on github.
  • I am pkatseas (https://keybase.io/pkatseas) on keybase.
  • I have a public key whose fingerprint is 1979 C365 A7F9 B974 6D33 FD40 0740 8B2F 7B14 A5D3

To claim this, I am signing this object: