I hereby claim:
- I am pkatseas on github.
- I am pkatseas (https://keybase.io/pkatseas) on keybase.
- I have a public key whose fingerprint is 1979 C365 A7F9 B974 6D33 FD40 0740 8B2F 7B14 A5D3
To claim this, I am signing this object:
I hereby claim:
To claim this, I am signing this object:
private Map<String, ArrayList<SupervisorDetails>> getSupervisorsByTag( | |
Collection<SupervisorDetails> supervisorDetails | |
) { | |
// A map of tag -> supervisors, to help with scheduling of components with specific tags | |
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = new HashMap<String, ArrayList<SupervisorDetails>>(); | |
for (SupervisorDetails supervisor : supervisorDetails) { | |
@SuppressWarnings("unchecked") | |
Map<String, String> metadata = (Map<String, String>) supervisor.getSchedulerMeta(); |
private <T> void populateComponentsByTag( | |
Map<String, ArrayList<String>> componentsByTag, | |
Map<String, T> components | |
) { | |
// Type T can be either Bolt or SpoutSpec, so that this logic can be reused for both component types | |
JSONParser parser = new JSONParser(); | |
for (Entry<String, T> componentEntry : components.entrySet()) { | |
JSONObject conf = null; |
private void populateComponentsByTagWithStormInternals( | |
Map<String, ArrayList<String>> componentsByTag, | |
Set<String> components | |
) { | |
// Storm uses some internal components, like __acker. | |
// These components are topology-agnostic and are therefore not accessible through a StormTopology object. | |
// While a bit hacky, this is a way to make sure that we schedule those components along with our topology ones: | |
// we treat these internal components as regular untagged components and add them to the componentsByTag map. | |
for (String componentID : components) { |
package com.edited.tagawarescheduler; | |
import java.lang.reflect.InvocationTargetException; | |
import java.lang.reflect.Method; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map.Entry; |
private Set<ExecutorDetails> getAliveExecutors(Cluster cluster, TopologyDetails topologyDetails) { | |
// Get the existing assignment of the current topology as it's live in the cluster | |
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId()); | |
// Return alive executors, if any, otherwise an empty set | |
if (existingAssignment != null) { | |
return existingAssignment.getExecutors(); | |
} else { | |
return new HashSet<ExecutorDetails>(); | |
} |
private void handleUnsuccessfulScheduling( | |
Cluster cluster, | |
TopologyDetails topologyDetails, | |
String message | |
) throws Exception { | |
// This is the prefix of the message displayed on Storm's UI for any unsuccessful scheduling | |
String unsuccessfulSchedulingMessage = "SCHEDULING FAILED: "; | |
cluster.setStatus(topologyDetails.getId(), unsuccessfulSchedulingMessage + message); | |
throw new Exception(message); |
<project | |
xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" | |
> | |
<modelVersion>4.0.0</modelVersion> | |
<url>http://maven.apache.org</url> | |
<groupId>com.edited</groupId> | |
<artifactId>tagawarescheduler</artifactId> |
private void tagAwareSchedule(Topologies topologies, Cluster cluster) { | |
Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values(); | |
// Get the lists of tagged and unreserved supervisors. | |
Map<String, ArrayList<SupervisorDetails>> supervisorsByTag = getSupervisorsByTag(supervisorDetails); | |
for (TopologyDetails topologyDetails : cluster.needsSchedulingTopologies(topologies)) { | |
StormTopology stormTopology = topologyDetails.getTopology(); | |
String topologyID = topologyDetails.getId(); |