Last active
April 8, 2019 11:10
-
-
Save devinsba/32bf8e1da56a5e368f1d697dfb3b6dd5 to your computer and use it in GitHub Desktop.
Zipkin Sparkstreaming - Vizceral
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.HashMap; | |
import java.util.Map; | |
public class Cluster { | |
String app; | |
String region; | |
String env; | |
Map<String, Metrics> connectionsFrom = new HashMap<>(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.annotation.JsonProperty; | |
public class Connection { | |
private String source; | |
private String target; | |
private String clazz; | |
private Metrics metrics; | |
private Connection() { | |
} | |
public static Builder newBuilder() { | |
return new Builder(); | |
} | |
public static final class Builder { | |
private String source; | |
private String target; | |
private String clazz; | |
private Metrics metrics; | |
private Builder() { | |
} | |
public Builder source(String source) { | |
this.source = source; | |
return this; | |
} | |
public Builder target(String target) { | |
this.target = target; | |
return this; | |
} | |
public Builder clazz(String clazz) { | |
this.clazz = clazz; | |
return this; | |
} | |
public Builder metrics(Metrics metrics) { | |
this.metrics = metrics; | |
return this; | |
} | |
public Connection build() { | |
Connection connection = new Connection(); | |
connection.target = this.target; | |
connection.clazz = this.clazz; | |
connection.source = this.source; | |
connection.metrics = this.metrics; | |
return connection; | |
} | |
} | |
public String getSource() { | |
return source; | |
} | |
public String getTarget() { | |
return target; | |
} | |
@JsonProperty("class") | |
public String getClazz() { | |
return clazz; | |
} | |
public Metrics getMetrics() { | |
return metrics; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf; | |
import org.apache.spark.streaming.Durations; | |
import org.apache.spark.streaming.api.java.JavaDStream; | |
import org.apache.spark.streaming.api.java.JavaPairDStream; | |
import org.apache.spark.streaming.api.java.JavaStreamingContext; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.CommandLineRunner; | |
import org.springframework.stereotype.Component; | |
import scala.Tuple2; | |
import zipkin.sparkstreaming.stream.kinesis.KinesisStreamFactory; | |
import zipkin2.Span; | |
import zipkin2.codec.SpanBytesDecoder; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Component | |
public class Job implements CommandLineRunner, AutoCloseable { | |
private static final Logger log = LoggerFactory.getLogger(Job.class); | |
static final int WINDOW_LENGTH_SECONDS = 30; | |
@Autowired | |
private KinesisStreamFactory streamFactory; | |
private JavaStreamingContext jssc; | |
public void run(String... strings) throws Exception { | |
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); | |
jssc = new JavaStreamingContext(conf, Durations.seconds(1)); | |
JavaDStream<byte[]> collectedSpans = streamFactory.create(jssc); | |
JavaDStream<byte[]> collectedSpansInWindow = collectedSpans.window(Durations.seconds(WINDOW_LENGTH_SECONDS), Durations.seconds(5)); | |
JavaDStream<Span> spans = collectedSpansInWindow.flatMap(b -> SpanBytesDecoder.JSON_V2.decodeList(b).iterator()); | |
JavaPairDStream<String, Iterable<Span>> tracesById = spans | |
.mapToPair(s -> new Tuple2<>(s.traceId(), s)) | |
.groupByKey(); | |
JavaDStream<SpanSummary> spanSummaryJavaDStream = tracesById.flatMap(spanTuples -> { | |
Iterable<Span> spansFromTrace = spanTuples._2; | |
Map<String, SpanSummary> spanSummaryMap = new HashMap<>(); | |
for (Span span : spansFromTrace) { | |
SpanSummary spanSummary = spanSummaryMap.getOrDefault(span.id(), new SpanSummary()); | |
if (spanSummary.traceId == null) { // New CallLink | |
spanSummary.traceId = span.traceId(); | |
spanSummary.spanId = span.id(); | |
} | |
if (span.kind().equals(Span.Kind.CLIENT)) { | |
spanSummary.from = span.localServiceName(); | |
spanSummary.duration = span.duration(); | |
if (!spanSummary.to.equals(SpanSummary.DEFAULT_TO)) { // Server span was set up first | |
spanSummary.timestamp = (spanSummary.timestamp + span.timestamp()) / 2; | |
} else { | |
spanSummary.timestamp = span.timestamp(); | |
} | |
} else if (span.kind().equals(Span.Kind.SERVER)) { | |
spanSummary.to = span.localServiceName(); | |
spanSummary.region = span.tags().getOrDefault("aws.region", "us-east-1"); | |
spanSummary.env = span.tags().getOrDefault("application.environment", "local"); | |
if (!spanSummary.from.equals(SpanSummary.DEFAULT_FROM)) { // Client span was set up first | |
spanSummary.timestamp = (spanSummary.timestamp + span.timestamp()) / 2; | |
} else { | |
spanSummary.timestamp = span.timestamp(); | |
spanSummary.duration = span.duration(); | |
} | |
} | |
spanSummaryMap.put(span.id(), spanSummary); | |
} | |
return spanSummaryMap.values().iterator(); | |
}); | |
spanSummaryJavaDStream.foreachRDD(spanSummaryJavaRDD -> SpanSummaryRegistry.update(spanSummaryJavaRDD.collect())); | |
jssc.start(); | |
} | |
@Override | |
public void close() throws Exception { | |
jssc.stop(); | |
jssc.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@CrossOrigin | |
@RestController | |
public class LiveDepsController { | |
@Value("${application.env:nonprod}") | |
private String appEnv; | |
@RequestMapping(path = "/live") | |
public Node live() { | |
return SpanSummaryRegistry.getNode(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.annotation.JsonIgnore; | |
public class Metrics { | |
private int normalCount = 0; | |
private int dangerCount = 0; | |
private int warningCount = 0; | |
private float divisor; | |
public Metrics(int divisor) { | |
this.divisor = divisor + 0.0f; | |
} | |
public void incNormal() { | |
normalCount++; | |
} | |
public void incNormal(int count) { | |
normalCount += count; | |
} | |
public float getNormal() { | |
return normalCount / divisor; | |
} | |
@JsonIgnore | |
public int getNormalCount() { | |
return normalCount; | |
} | |
public float getDanger() { | |
return dangerCount / divisor; | |
} | |
public float getWarning() { | |
return warningCount / divisor; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.annotation.JsonProperty; | |
import java.util.List; | |
public class Node { | |
private String name; | |
private RendererType renderer; | |
private String displayName; | |
private List<Node> nodes; | |
private List<Connection> connections; | |
private long updated; | |
private String clazz; | |
private Node() { | |
} | |
public static Builder newBuilder() { | |
return new Builder(); | |
} | |
public static final class Builder { | |
private String name; | |
private RendererType renderer; | |
private String displayName; | |
private List<Node> nodes; | |
private List<Connection> connections; | |
private long updated = System.currentTimeMillis(); | |
private String clazz; | |
private Builder() { | |
} | |
public Builder name(String name) { | |
this.name = name; | |
return this; | |
} | |
public Builder renderer(RendererType renderer) { | |
this.renderer = renderer; | |
return this; | |
} | |
public Builder displayName(String displayName) { | |
this.displayName = displayName; | |
return this; | |
} | |
public Builder nodes(List<Node> nodes) { | |
this.nodes = nodes; | |
return this; | |
} | |
public Builder connections(List<Connection> connections) { | |
this.connections = connections; | |
return this; | |
} | |
public Builder updated(long updated) { | |
this.updated = updated; | |
return this; | |
} | |
public Builder clazz(String clazz) { | |
this.clazz = clazz; | |
return this; | |
} | |
public Node build() { | |
Node node = new Node(); | |
node.renderer = this.renderer; | |
node.name = this.name; | |
node.displayName = this.displayName; | |
node.nodes = this.nodes; | |
node.connections = this.connections; | |
node.updated = this.updated; | |
node.clazz = this.clazz; | |
return node; | |
} | |
} | |
public String getName() { | |
return name; | |
} | |
public RendererType getRenderer() { | |
return renderer; | |
} | |
public String getDisplayName() { | |
return displayName; | |
} | |
public List<Node> getNodes() { | |
return nodes; | |
} | |
public List<Connection> getConnections() { | |
return connections; | |
} | |
public long getUpdated() { | |
return updated; | |
} | |
@JsonProperty("class") | |
public String getClazz() { | |
return clazz; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_${spark_scala.version}</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-streaming_${spark_scala.version}</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>io.zipkin.aws</groupId> | |
<artifactId>zipkin-sparkstreaming-stream-kinesis</artifactId> | |
<version>0.6.2-SNAPSHOT</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
<version>1.7.25</version> | |
</dependency> | |
<dependency> | |
<groupId>org.hibernate</groupId> | |
<artifactId>hibernate-validator</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.annotation.JsonValue; | |
public enum RendererType { | |
GLOBAL("global"), | |
REGION("region"); | |
private String value; | |
@Override | |
@JsonValue | |
public String toString() { | |
return value; | |
} | |
RendererType(String value) { | |
this.value = value; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SpanSummary implements Serializable { | |
static final String DEFAULT_FROM = "INTERNET"; | |
static final String DEFAULT_TO = "UNKNOWN"; | |
String traceId; | |
String spanId; | |
long timestamp; | |
long duration; | |
String to = DEFAULT_TO; | |
String from = DEFAULT_FROM; | |
String env; | |
String region; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.concurrent.atomic.AtomicReference; | |
import java.util.stream.Collectors; | |
public class SpanSummaryRegistry { | |
private static final AtomicReference<Node> NODE = new AtomicReference<>(Node.newBuilder().build()); | |
private static final Node INTERNET = Node.newBuilder() | |
.renderer(RendererType.REGION).name("INTERNET").displayName("INTERNET").clazz("normal").build(); | |
static void update(List<SpanSummary> spanSummaries) { | |
Map<String, Map<String, Map<String, Cluster>>> clustersByRegion = getClusters(spanSummaries); | |
NODE.set(Node.newBuilder() | |
.name("root") | |
.renderer(RendererType.GLOBAL) | |
.updated(System.currentTimeMillis()) | |
.nodes(getRegionNodes(clustersByRegion)) | |
.connections(connectionsForGlobalView(clustersByRegion)) | |
.build()); | |
} | |
static Node getNode() { | |
return NODE.get(); | |
} | |
private static Map<String, Map<String, Map<String, Cluster>>> getClusters(List<SpanSummary> spanSummaries) { | |
Map<String, Map<String, Map<String, Cluster>>> clusters = new HashMap<>(); | |
spanSummaries.stream().map(s -> s.region).filter(Objects::nonNull).forEach(r -> clusters.put(r, new HashMap<>())); // create the maps by region | |
spanSummaries.stream().map(s -> s.env).filter(Objects::nonNull).forEach(e -> { // create the maps by env in the region maps | |
for (String region : clusters.keySet()) { | |
clusters.get(region).put(e, new HashMap<>()); | |
} | |
}); | |
spanSummaries.stream().filter(s -> s.env != null && s.region != null).forEach(s -> { | |
Cluster cluster = clusters.get(s.region).get(s.env).get(s.to); | |
if (cluster == null) { | |
cluster = new Cluster(); | |
clusters.get(s.region).get(s.env).put(s.to, cluster); | |
cluster.region = s.region; | |
cluster.env = s.env; | |
cluster.app = s.to; | |
} | |
Metrics metrics = cluster.connectionsFrom.get(s.from); | |
if (metrics == null) { | |
metrics = new Metrics(Job.WINDOW_LENGTH_SECONDS); | |
cluster.connectionsFrom.put(s.from, metrics); | |
} | |
metrics.incNormal(); | |
}); | |
return clusters; | |
} | |
private static List<Node> getRegionNodes(Map<String, Map<String, Map<String, Cluster>>> clusters) { | |
List<Node> nodes = new ArrayList<>(); | |
nodes.add(INTERNET); | |
for (String region : clusters.keySet()) { | |
nodes.add(Node.newBuilder() | |
.renderer(RendererType.GLOBAL) | |
.displayName(region) | |
.name(region) | |
.clazz("normal") | |
.connections(connectionsForRegionView(clusters.get(region))) | |
.nodes(getEnvNodes(clusters.get(region))) | |
.build()); | |
} | |
return nodes; | |
} | |
private static List<Node> getEnvNodes(Map<String, Map<String, Cluster>> clustersInRegion) { | |
List<Node> nodes = new ArrayList<>(); | |
nodes.add(INTERNET); | |
for (String env : clustersInRegion.keySet()) { | |
nodes.add(Node.newBuilder() | |
.renderer(RendererType.REGION) | |
.displayName(env) | |
.name(env) | |
.clazz("normal") | |
.connections(connectionsForEnvView(clustersInRegion.get(env))) | |
.nodes(getClusterNodes(clustersInRegion.get(env))) | |
.build()); | |
} | |
return nodes; | |
} | |
private static List<Node> getClusterNodes(Map<String, Cluster> clusters) { | |
List<Node> nodes = new ArrayList<>(); | |
nodes.add(INTERNET); | |
for (String app : clusters.keySet()) { | |
nodes.add(Node.newBuilder() | |
.renderer(RendererType.REGION) | |
.displayName(app) | |
.name(app) | |
.clazz("normal") | |
.build()); | |
} | |
return nodes; | |
} | |
private static List<Connection> connectionsForGlobalView(Map<String, Map<String, Map<String, Cluster>>> clusters) { | |
List<Connection> connections = new ArrayList<>(); | |
for (Map.Entry<String, Map<String, Map<String, Cluster>>> clustersWithRegion : clusters.entrySet()) { | |
List<Connection> connectionsForRegion = new ArrayList<>(); | |
clustersWithRegion.getValue().values().forEach(c -> connectionsForRegion.addAll(connectionsForRegionView(clustersWithRegion.getValue()))); | |
List<Metrics> metricsList = connectionsForRegion.stream().map(Connection::getMetrics).collect(Collectors.toList()); | |
Metrics metrics = consolidateMetrics(metricsList); | |
connections.add(Connection.newBuilder() | |
.source(SpanSummary.DEFAULT_FROM) | |
.target(clustersWithRegion.getKey()) | |
.metrics(metrics) | |
.clazz("normal") | |
.build() | |
); | |
} | |
return connections; | |
} | |
/* | |
* Region view shows the envs in a region | |
*/ | |
private static List<Connection> connectionsForRegionView(Map<String, Map<String, Cluster>> clustersInRegion) { | |
List<Connection> connections = new ArrayList<>(); | |
for (Map.Entry<String, Map<String, Cluster>> clustersWithEnv : clustersInRegion.entrySet()) { | |
List<Connection> connectionsForEnv = new ArrayList<>(); | |
clustersWithEnv.getValue().values().forEach(c -> connectionsForEnv.addAll(connectionsForCluster(c))); | |
List<Metrics> metricsList = connectionsForEnv.stream().map(Connection::getMetrics).collect(Collectors.toList()); | |
Metrics metrics = consolidateMetrics(metricsList); | |
connections.add(Connection.newBuilder() | |
.source(SpanSummary.DEFAULT_FROM) | |
.target(clustersWithEnv.getKey()) | |
.metrics(metrics) | |
.clazz("normal") | |
.build() | |
); | |
} | |
return connections; | |
} | |
/* | |
* Env view is showing the apps in a environment | |
*/ | |
private static List<Connection> connectionsForEnvView(Map<String, Cluster> clustersInEnv) { | |
List<Connection> connections = new ArrayList<>(); | |
for (Map.Entry<String, Cluster> cluster : clustersInEnv.entrySet()) { | |
connections.addAll(connectionsForCluster(cluster.getValue())); | |
} | |
return connections; | |
} | |
/* | |
* The connections to an app in the smallest view | |
*/ | |
private static List<Connection> connectionsForCluster(Cluster cluster) { | |
List<Connection> connections = new ArrayList<>(); | |
for (Map.Entry<String, Metrics> metrics : cluster.connectionsFrom.entrySet()) { | |
connections.add(Connection.newBuilder() | |
.source(metrics.getKey()) | |
.target(cluster.app) | |
.metrics(metrics.getValue()) | |
.clazz("normal") | |
.build() | |
); | |
} | |
return connections; | |
} | |
private static Metrics consolidateMetrics(List<Metrics> metrics) { | |
Metrics result = new Metrics(Job.WINDOW_LENGTH_SECONDS); | |
for (Metrics m : metrics) { | |
result.incNormal(m.getNormalCount()); | |
} | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment