Created
December 19, 2018 03:47
-
-
Save paul-brebner/c36192c7a17ea3767e76640e3c1eec41 to your computer and use it in GitHub Desktop.
Simulator to produce interesting OpenTracing + Jaeger traces across simulated Kafka topics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.ThreadLocalRandom; | |
// Opentracing APIs | |
import io.opentracing.Tracer; | |
import io.opentracing.References; | |
import io.opentracing.Span; | |
import io.opentracing.SpanContext; | |
// Jaeger is a specific tracer | |
import io.jaegertracing.Configuration; | |
import io.jaegertracing.Configuration.ReporterConfiguration; | |
import io.jaegertracing.Configuration.SamplerConfiguration; | |
import io.jaegertracing.internal.JaegerTracer; | |
// OpenTracing Kafka simulator with interesting consumer->producer relationships randomly generated | |
// For dependecies view to work you need multiple tracers! | |
public class OpenTracingKafkaSimulator | |
{ | |
public static void main(String[] args) | |
{ | |
int numSources = 3; // num of source topics | |
int numOther = 10; // num of other topics | |
int numSinks = 5; // num sink topics, note some other topics may randomly end up as sinks | |
int numTopics = numSources + numOther + numSinks; | |
int maxTraces = 200; | |
int numTracers = numTopics; // 1 tracer per topic to ensure dependency graph is correct | |
double densityProb = 0.6; // increase for denser graphs, decrease for sparser graphs | |
// create 1 tracer per topic | |
Tracer tracers[] = new Tracer[numTracers]; | |
String tracersName[] = new String[numTracers]; | |
for (int i=0; i < numTracers; i++) | |
{ | |
String n; | |
if (i < numSources) | |
n = "sourceTopic"; | |
else if (i < numSources+numOther) | |
n = "topic"; | |
else n = "sinkTopic"; | |
tracers[i] = initTracer(n+i); | |
tracersName[i] = n; | |
} | |
// array of from topics -> topics | |
// if a row has all 0's then topic is a sink, if col has all 0's then topic is a source. | |
int [][] weights = new int[numTopics][numTopics]; | |
double [][] fromToProb = new double[numTopics][numTopics]; | |
// set all values to 1 | |
for (int i=0; i < numTopics; i++) | |
for (int j=0; j < numTopics; j++) | |
weights[i][j] = 1; | |
// create source topics: set weights in col to 0 | |
for (int i=0; i < numTopics ; i++) | |
for (int j=0; j < numSources; j++) | |
weights[i][j] = 0; | |
// create sink topics: set weights in row to 0 | |
for (int i=(numSources+numOther); i < numTopics ; i++) | |
for (int j=0; j < numTopics; j++) | |
weights[i][j] = 0; | |
// create random weights | |
for (int i=0; i < numTopics; i++) | |
for (int j=0; j < numTopics; j++) | |
{ | |
// to prevent cycles, don't allow topics to connect to lower topics or self | |
if (j <= i) | |
weights[i][j] = 0; | |
else | |
// decide if non 0 value, and don't change existing 0 values | |
if (weights[i][j] > 0) | |
{ | |
if (ThreadLocalRandom.current().nextDouble() < densityProb) | |
weights[i][j] = ThreadLocalRandom.current().nextInt(1, 101); | |
else weights[i][j] = 0; | |
} | |
} | |
// compute probabilities from weights | |
for (int i=0; i < numTopics; i++) | |
{ | |
double sum = 0; | |
// compute sum for row | |
for (int j=0; j < numTopics; j++) | |
sum += weights[i][j]; | |
if (sum > 0) | |
{ | |
// compute probabilities | |
for (int j=0; j < numTopics; j++) | |
{ | |
double prob = (double)weights[i][j]/sum; | |
fromToProb[i][j] = prob; | |
} | |
// convert to cumulative probabilities | |
double prev = 0.0; | |
for (int j=0; j < numTopics; j++) | |
{ | |
fromToProb[i][j] += prev; | |
prev = fromToProb[i][j]; | |
} | |
} | |
} | |
// create random traces | |
for (int i=0; i < maxTraces; i++) | |
{ | |
// first span in trace has no parent | |
SpanContext context = null; | |
// pick random source topic (all source topics currently have equal probability) | |
int row = ThreadLocalRandom.current().nextInt(0, numSources); | |
boolean end = false; | |
while (!end) | |
{ | |
Span span = tracers[row].buildSpan(tracersName[row]).addReference(References.CHILD_OF, context).start(); | |
// fake some activity | |
try { | |
Thread.sleep(1 + (long)(Math.random()*50)); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
span.finish(); | |
context = span.context(); | |
double rand = ThreadLocalRandom.current().nextDouble(); | |
// find 1st row that has cumulative prob > rand, this will be the next topic in the trace | |
System.out.print("-> topic" + row); | |
int col = -1; | |
for (int j=0; j < numTopics; j++) | |
if (fromToProb[row][j] > rand) | |
{ | |
col = j; | |
break; | |
} | |
if (col >= 0) | |
row = col; | |
else | |
{ | |
System.out.println(); | |
end = true; | |
} | |
} | |
} | |
} | |
// magic stuff to create a tracer | |
public static JaegerTracer initTracer(String service) { | |
SamplerConfiguration samplerConfig = SamplerConfiguration.fromEnv().withType("const").withParam(1); | |
ReporterConfiguration reporterConfig = ReporterConfiguration.fromEnv().withLogSpans(true); | |
Configuration config = new Configuration(service).withSampler(samplerConfig).withReporter(reporterConfig); | |
return config.getTracer(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment