Skip to content

Instantly share code, notes, and snippets.

@haoch
Created September 7, 2016 12:35
Show Gist options
  • Save haoch/997344dae626c8858560825e7fe2356b to your computer and use it in GitHub Desktop.
Save haoch/997344dae626c8858560825e7fe2356b to your computer and use it in GitHub Desktop.
@Override
public synchronized void status(final PipelineExecutionEntity entity) throws Exception {
String stormUIUrl= (String) entity.getPipeline().getCluster().getProperties().get(STORM_URL);
entity.requireUpdate(true);
if (LogStormConstants.DeployMode.LOCAL.equals(entity.getPipeline().getMode())) {
if(!ExecutionManager.getInstance().contains(entity.getName())){
LOG.info("Pipeline instance '{}' is not ready yet",entity.getName());
}
PipelineExecutionStatus currentStatus = entity.getStatus();
PipelineExecutionStatus newStatus = ExecutionManager.getWorkerStatus(ExecutionManager.getInstance().get(entity.getName()).getState());
if (!currentStatus.equals(newStatus)) {
LOG.info("Status of pipeline: {} changed from {} to {}", entity, currentStatus, newStatus);
entity.setStatus(newStatus);
entity.setDescription(String.format("Status of pipeline: %s changed from %s to %s", entity, currentStatus, newStatus));
}
} else {
try {
Nimbus.Client client = getStormClient(entity);
String id = entity.getProperties() == null? null:entity.getProperties().getProperty(topology_id_key);
if(id == null || id.isEmpty()) {
for (TopologySummary topologySummary : client.getClusterInfo().get_topologies()) {
if (topologySummary.get_name().equals(entity.getName())) {
id = topologySummary.get_id();
}
}
}
if(id == null || id.isEmpty()){
throw new NotAliveException("Topology named "+entity.getName()+" is not found");
} else {
TopologyInfo topologyInfo = client.getTopologyInfo(id);
entity.setProperty(topology_id_key, topologyInfo.get_id());
entity.setProperty("topology.name", topologyInfo.get_name());
entity.setProperty("topology.status", topologyInfo.get_status());
entity.setProperty("topology.uptime_secs", String.valueOf(topologyInfo.get_uptime_secs()));
entity.setProperty("topology.executors_size", String.valueOf(topologyInfo.get_executors_size()));
entity.setProperty("topology.errors_size", String.valueOf(topologyInfo.get_errors_size()));
entity.setUrl(String.format("%s/topology.html?id=%s",stormUIUrl,topologyInfo.get_id()));
Map<String,List<ErrorInfo>> errors = topologyInfo.get_errors();
StringBuilder sb = new StringBuilder();
int errorInfoSize = 0;
if(topologyInfo.get_errors_size()>0) {
for (Map.Entry<String, List<ErrorInfo>> entry : errors.entrySet()) {
sb.append(entry.getKey());
sb.append(": \n");
for (ErrorInfo errorInfo : entry.getValue()) {
errorInfoSize ++;
sb.append("\t");
sb.append(errorInfo.get_error());
sb.append("\n");
}
}
if(errorInfoSize>0) {
LOG.error(sb.toString());
entity.setDescription(sb.toString());
}
}
entity.setStatus(ExecutionManager.getTopologyStatus(topologyInfo.get_status()));
}
}catch (NotAliveException ex){
LOG.info("{} not alive, change status as STOPPED",entity.getName(),ex);
entity.setStatus(PipelineExecutionStatus.STOPPED);
entity.setProperty("topology.status","NOT_ALIVE");
entity.setDescription(ex.getMessage());
} catch (Exception ex ){
entity.setStatus(PipelineExecutionStatus.STOPPED);
entity.setProperty("topology.status","UNKNOWN");
entity.setDescription(ex.getMessage());
LOG.error(ex.getMessage(), ex);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment