Skip to content

Instantly share code, notes, and snippets.

@devinsba
Last active April 8, 2019 11:10
Show Gist options
  • Save devinsba/32bf8e1da56a5e368f1d697dfb3b6dd5 to your computer and use it in GitHub Desktop.
Save devinsba/32bf8e1da56a5e368f1d697dfb3b6dd5 to your computer and use it in GitHub Desktop.
Zipkin Sparkstreaming - Vizceral
import java.util.HashMap;
import java.util.Map;
public class Cluster {
String app;
String region;
String env;
Map<String, Metrics> connectionsFrom = new HashMap<>();
}
import com.fasterxml.jackson.annotation.JsonProperty;
public class Connection {
private String source;
private String target;
private String clazz;
private Metrics metrics;
private Connection() {
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private String source;
private String target;
private String clazz;
private Metrics metrics;
private Builder() {
}
public Builder source(String source) {
this.source = source;
return this;
}
public Builder target(String target) {
this.target = target;
return this;
}
public Builder clazz(String clazz) {
this.clazz = clazz;
return this;
}
public Builder metrics(Metrics metrics) {
this.metrics = metrics;
return this;
}
public Connection build() {
Connection connection = new Connection();
connection.target = this.target;
connection.clazz = this.clazz;
connection.source = this.source;
connection.metrics = this.metrics;
return connection;
}
}
public String getSource() {
return source;
}
public String getTarget() {
return target;
}
@JsonProperty("class")
public String getClazz() {
return clazz;
}
public Metrics getMetrics() {
return metrics;
}
}
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import zipkin.sparkstreaming.stream.kinesis.KinesisStreamFactory;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import java.util.HashMap;
import java.util.Map;
@Component
public class Job implements CommandLineRunner, AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(Job.class);
static final int WINDOW_LENGTH_SECONDS = 30;
@Autowired
private KinesisStreamFactory streamFactory;
private JavaStreamingContext jssc;
public void run(String... strings) throws Exception {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaDStream<byte[]> collectedSpans = streamFactory.create(jssc);
JavaDStream<byte[]> collectedSpansInWindow = collectedSpans.window(Durations.seconds(WINDOW_LENGTH_SECONDS), Durations.seconds(5));
JavaDStream<Span> spans = collectedSpansInWindow.flatMap(b -> SpanBytesDecoder.JSON_V2.decodeList(b).iterator());
JavaPairDStream<String, Iterable<Span>> tracesById = spans
.mapToPair(s -> new Tuple2<>(s.traceId(), s))
.groupByKey();
JavaDStream<SpanSummary> spanSummaryJavaDStream = tracesById.flatMap(spanTuples -> {
Iterable<Span> spansFromTrace = spanTuples._2;
Map<String, SpanSummary> spanSummaryMap = new HashMap<>();
for (Span span : spansFromTrace) {
SpanSummary spanSummary = spanSummaryMap.getOrDefault(span.id(), new SpanSummary());
if (spanSummary.traceId == null) { // New CallLink
spanSummary.traceId = span.traceId();
spanSummary.spanId = span.id();
}
if (span.kind().equals(Span.Kind.CLIENT)) {
spanSummary.from = span.localServiceName();
spanSummary.duration = span.duration();
if (!spanSummary.to.equals(SpanSummary.DEFAULT_TO)) { // Server span was set up first
spanSummary.timestamp = (spanSummary.timestamp + span.timestamp()) / 2;
} else {
spanSummary.timestamp = span.timestamp();
}
} else if (span.kind().equals(Span.Kind.SERVER)) {
spanSummary.to = span.localServiceName();
spanSummary.region = span.tags().getOrDefault("aws.region", "us-east-1");
spanSummary.env = span.tags().getOrDefault("application.environment", "local");
if (!spanSummary.from.equals(SpanSummary.DEFAULT_FROM)) { // Client span was set up first
spanSummary.timestamp = (spanSummary.timestamp + span.timestamp()) / 2;
} else {
spanSummary.timestamp = span.timestamp();
spanSummary.duration = span.duration();
}
}
spanSummaryMap.put(span.id(), spanSummary);
}
return spanSummaryMap.values().iterator();
});
spanSummaryJavaDStream.foreachRDD(spanSummaryJavaRDD -> SpanSummaryRegistry.update(spanSummaryJavaRDD.collect()));
jssc.start();
}
@Override
public void close() throws Exception {
jssc.stop();
jssc.close();
}
}
@CrossOrigin
@RestController
public class LiveDepsController {
@Value("${application.env:nonprod}")
private String appEnv;
@RequestMapping(path = "/live")
public Node live() {
return SpanSummaryRegistry.getNode();
}
}
import com.fasterxml.jackson.annotation.JsonIgnore;
public class Metrics {
private int normalCount = 0;
private int dangerCount = 0;
private int warningCount = 0;
private float divisor;
public Metrics(int divisor) {
this.divisor = divisor + 0.0f;
}
public void incNormal() {
normalCount++;
}
public void incNormal(int count) {
normalCount += count;
}
public float getNormal() {
return normalCount / divisor;
}
@JsonIgnore
public int getNormalCount() {
return normalCount;
}
public float getDanger() {
return dangerCount / divisor;
}
public float getWarning() {
return warningCount / divisor;
}
}
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public class Node {
private String name;
private RendererType renderer;
private String displayName;
private List<Node> nodes;
private List<Connection> connections;
private long updated;
private String clazz;
private Node() {
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private String name;
private RendererType renderer;
private String displayName;
private List<Node> nodes;
private List<Connection> connections;
private long updated = System.currentTimeMillis();
private String clazz;
private Builder() {
}
public Builder name(String name) {
this.name = name;
return this;
}
public Builder renderer(RendererType renderer) {
this.renderer = renderer;
return this;
}
public Builder displayName(String displayName) {
this.displayName = displayName;
return this;
}
public Builder nodes(List<Node> nodes) {
this.nodes = nodes;
return this;
}
public Builder connections(List<Connection> connections) {
this.connections = connections;
return this;
}
public Builder updated(long updated) {
this.updated = updated;
return this;
}
public Builder clazz(String clazz) {
this.clazz = clazz;
return this;
}
public Node build() {
Node node = new Node();
node.renderer = this.renderer;
node.name = this.name;
node.displayName = this.displayName;
node.nodes = this.nodes;
node.connections = this.connections;
node.updated = this.updated;
node.clazz = this.clazz;
return node;
}
}
public String getName() {
return name;
}
public RendererType getRenderer() {
return renderer;
}
public String getDisplayName() {
return displayName;
}
public List<Node> getNodes() {
return nodes;
}
public List<Connection> getConnections() {
return connections;
}
public long getUpdated() {
return updated;
}
@JsonProperty("class")
public String getClazz() {
return clazz;
}
}
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${spark_scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${spark_scala.version}</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.aws</groupId>
<artifactId>zipkin-sparkstreaming-stream-kinesis</artifactId>
<version>0.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
import com.fasterxml.jackson.annotation.JsonValue;
public enum RendererType {
GLOBAL("global"),
REGION("region");
private String value;
@Override
@JsonValue
public String toString() {
return value;
}
RendererType(String value) {
this.value = value;
}
}
public class SpanSummary implements Serializable {
static final String DEFAULT_FROM = "INTERNET";
static final String DEFAULT_TO = "UNKNOWN";
String traceId;
String spanId;
long timestamp;
long duration;
String to = DEFAULT_TO;
String from = DEFAULT_FROM;
String env;
String region;
}
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class SpanSummaryRegistry {
private static final AtomicReference<Node> NODE = new AtomicReference<>(Node.newBuilder().build());
private static final Node INTERNET = Node.newBuilder()
.renderer(RendererType.REGION).name("INTERNET").displayName("INTERNET").clazz("normal").build();
static void update(List<SpanSummary> spanSummaries) {
Map<String, Map<String, Map<String, Cluster>>> clustersByRegion = getClusters(spanSummaries);
NODE.set(Node.newBuilder()
.name("root")
.renderer(RendererType.GLOBAL)
.updated(System.currentTimeMillis())
.nodes(getRegionNodes(clustersByRegion))
.connections(connectionsForGlobalView(clustersByRegion))
.build());
}
static Node getNode() {
return NODE.get();
}
private static Map<String, Map<String, Map<String, Cluster>>> getClusters(List<SpanSummary> spanSummaries) {
Map<String, Map<String, Map<String, Cluster>>> clusters = new HashMap<>();
spanSummaries.stream().map(s -> s.region).filter(Objects::nonNull).forEach(r -> clusters.put(r, new HashMap<>())); // create the maps by region
spanSummaries.stream().map(s -> s.env).filter(Objects::nonNull).forEach(e -> { // create the maps by env in the region maps
for (String region : clusters.keySet()) {
clusters.get(region).put(e, new HashMap<>());
}
});
spanSummaries.stream().filter(s -> s.env != null && s.region != null).forEach(s -> {
Cluster cluster = clusters.get(s.region).get(s.env).get(s.to);
if (cluster == null) {
cluster = new Cluster();
clusters.get(s.region).get(s.env).put(s.to, cluster);
cluster.region = s.region;
cluster.env = s.env;
cluster.app = s.to;
}
Metrics metrics = cluster.connectionsFrom.get(s.from);
if (metrics == null) {
metrics = new Metrics(Job.WINDOW_LENGTH_SECONDS);
cluster.connectionsFrom.put(s.from, metrics);
}
metrics.incNormal();
});
return clusters;
}
private static List<Node> getRegionNodes(Map<String, Map<String, Map<String, Cluster>>> clusters) {
List<Node> nodes = new ArrayList<>();
nodes.add(INTERNET);
for (String region : clusters.keySet()) {
nodes.add(Node.newBuilder()
.renderer(RendererType.GLOBAL)
.displayName(region)
.name(region)
.clazz("normal")
.connections(connectionsForRegionView(clusters.get(region)))
.nodes(getEnvNodes(clusters.get(region)))
.build());
}
return nodes;
}
private static List<Node> getEnvNodes(Map<String, Map<String, Cluster>> clustersInRegion) {
List<Node> nodes = new ArrayList<>();
nodes.add(INTERNET);
for (String env : clustersInRegion.keySet()) {
nodes.add(Node.newBuilder()
.renderer(RendererType.REGION)
.displayName(env)
.name(env)
.clazz("normal")
.connections(connectionsForEnvView(clustersInRegion.get(env)))
.nodes(getClusterNodes(clustersInRegion.get(env)))
.build());
}
return nodes;
}
private static List<Node> getClusterNodes(Map<String, Cluster> clusters) {
List<Node> nodes = new ArrayList<>();
nodes.add(INTERNET);
for (String app : clusters.keySet()) {
nodes.add(Node.newBuilder()
.renderer(RendererType.REGION)
.displayName(app)
.name(app)
.clazz("normal")
.build());
}
return nodes;
}
private static List<Connection> connectionsForGlobalView(Map<String, Map<String, Map<String, Cluster>>> clusters) {
List<Connection> connections = new ArrayList<>();
for (Map.Entry<String, Map<String, Map<String, Cluster>>> clustersWithRegion : clusters.entrySet()) {
List<Connection> connectionsForRegion = new ArrayList<>();
clustersWithRegion.getValue().values().forEach(c -> connectionsForRegion.addAll(connectionsForRegionView(clustersWithRegion.getValue())));
List<Metrics> metricsList = connectionsForRegion.stream().map(Connection::getMetrics).collect(Collectors.toList());
Metrics metrics = consolidateMetrics(metricsList);
connections.add(Connection.newBuilder()
.source(SpanSummary.DEFAULT_FROM)
.target(clustersWithRegion.getKey())
.metrics(metrics)
.clazz("normal")
.build()
);
}
return connections;
}
/*
* Region view shows the envs in a region
*/
private static List<Connection> connectionsForRegionView(Map<String, Map<String, Cluster>> clustersInRegion) {
List<Connection> connections = new ArrayList<>();
for (Map.Entry<String, Map<String, Cluster>> clustersWithEnv : clustersInRegion.entrySet()) {
List<Connection> connectionsForEnv = new ArrayList<>();
clustersWithEnv.getValue().values().forEach(c -> connectionsForEnv.addAll(connectionsForCluster(c)));
List<Metrics> metricsList = connectionsForEnv.stream().map(Connection::getMetrics).collect(Collectors.toList());
Metrics metrics = consolidateMetrics(metricsList);
connections.add(Connection.newBuilder()
.source(SpanSummary.DEFAULT_FROM)
.target(clustersWithEnv.getKey())
.metrics(metrics)
.clazz("normal")
.build()
);
}
return connections;
}
/*
* Env view is showing the apps in a environment
*/
private static List<Connection> connectionsForEnvView(Map<String, Cluster> clustersInEnv) {
List<Connection> connections = new ArrayList<>();
for (Map.Entry<String, Cluster> cluster : clustersInEnv.entrySet()) {
connections.addAll(connectionsForCluster(cluster.getValue()));
}
return connections;
}
/*
* The connections to an app in the smallest view
*/
private static List<Connection> connectionsForCluster(Cluster cluster) {
List<Connection> connections = new ArrayList<>();
for (Map.Entry<String, Metrics> metrics : cluster.connectionsFrom.entrySet()) {
connections.add(Connection.newBuilder()
.source(metrics.getKey())
.target(cluster.app)
.metrics(metrics.getValue())
.clazz("normal")
.build()
);
}
return connections;
}
private static Metrics consolidateMetrics(List<Metrics> metrics) {
Metrics result = new Metrics(Job.WINDOW_LENGTH_SECONDS);
for (Metrics m : metrics) {
result.incNormal(m.getNormalCount());
}
return result;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment