Skip to content

Instantly share code, notes, and snippets.

@paul-brebner
Created December 19, 2018 03:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paul-brebner/c36192c7a17ea3767e76640e3c1eec41 to your computer and use it in GitHub Desktop.
Save paul-brebner/c36192c7a17ea3767e76640e3c1eec41 to your computer and use it in GitHub Desktop.
Simulator to produce interesting OpenTracing + Jaeger traces across simulated Kafka topics
import java.util.concurrent.ThreadLocalRandom;
// Opentracing APIs
import io.opentracing.Tracer;
import io.opentracing.References;
import io.opentracing.Span;
import io.opentracing.SpanContext;
// Jaeger is a specific tracer
import io.jaegertracing.Configuration;
import io.jaegertracing.Configuration.ReporterConfiguration;
import io.jaegertracing.Configuration.SamplerConfiguration;
import io.jaegertracing.internal.JaegerTracer;
// OpenTracing Kafka simulator with interesting consumer->producer relationships randomly generated
// For dependecies view to work you need multiple tracers!
public class OpenTracingKafkaSimulator
{
public static void main(String[] args)
{
int numSources = 3; // num of source topics
int numOther = 10; // num of other topics
int numSinks = 5; // num sink topics, note some other topics may randomly end up as sinks
int numTopics = numSources + numOther + numSinks;
int maxTraces = 200;
int numTracers = numTopics; // 1 tracer per topic to ensure dependency graph is correct
double densityProb = 0.6; // increase for denser graphs, decrease for sparser graphs
// create 1 tracer per topic
Tracer tracers[] = new Tracer[numTracers];
String tracersName[] = new String[numTracers];
for (int i=0; i < numTracers; i++)
{
String n;
if (i < numSources)
n = "sourceTopic";
else if (i < numSources+numOther)
n = "topic";
else n = "sinkTopic";
tracers[i] = initTracer(n+i);
tracersName[i] = n;
}
// array of from topics -> topics
// if a row has all 0's then topic is a sink, if col has all 0's then topic is a source.
int [][] weights = new int[numTopics][numTopics];
double [][] fromToProb = new double[numTopics][numTopics];
// set all values to 1
for (int i=0; i < numTopics; i++)
for (int j=0; j < numTopics; j++)
weights[i][j] = 1;
// create source topics: set weights in col to 0
for (int i=0; i < numTopics ; i++)
for (int j=0; j < numSources; j++)
weights[i][j] = 0;
// create sink topics: set weights in row to 0
for (int i=(numSources+numOther); i < numTopics ; i++)
for (int j=0; j < numTopics; j++)
weights[i][j] = 0;
// create random weights
for (int i=0; i < numTopics; i++)
for (int j=0; j < numTopics; j++)
{
// to prevent cycles, don't allow topics to connect to lower topics or self
if (j <= i)
weights[i][j] = 0;
else
// decide if non 0 value, and don't change existing 0 values
if (weights[i][j] > 0)
{
if (ThreadLocalRandom.current().nextDouble() < densityProb)
weights[i][j] = ThreadLocalRandom.current().nextInt(1, 101);
else weights[i][j] = 0;
}
}
// compute probabilities from weights
for (int i=0; i < numTopics; i++)
{
double sum = 0;
// compute sum for row
for (int j=0; j < numTopics; j++)
sum += weights[i][j];
if (sum > 0)
{
// compute probabilities
for (int j=0; j < numTopics; j++)
{
double prob = (double)weights[i][j]/sum;
fromToProb[i][j] = prob;
}
// convert to cumulative probabilities
double prev = 0.0;
for (int j=0; j < numTopics; j++)
{
fromToProb[i][j] += prev;
prev = fromToProb[i][j];
}
}
}
// create random traces
for (int i=0; i < maxTraces; i++)
{
// first span in trace has no parent
SpanContext context = null;
// pick random source topic (all source topics currently have equal probability)
int row = ThreadLocalRandom.current().nextInt(0, numSources);
boolean end = false;
while (!end)
{
Span span = tracers[row].buildSpan(tracersName[row]).addReference(References.CHILD_OF, context).start();
// fake some activity
try {
Thread.sleep(1 + (long)(Math.random()*50));
} catch (InterruptedException e) {
e.printStackTrace();
}
span.finish();
context = span.context();
double rand = ThreadLocalRandom.current().nextDouble();
// find 1st row that has cumulative prob > rand, this will be the next topic in the trace
System.out.print("-> topic" + row);
int col = -1;
for (int j=0; j < numTopics; j++)
if (fromToProb[row][j] > rand)
{
col = j;
break;
}
if (col >= 0)
row = col;
else
{
System.out.println();
end = true;
}
}
}
}
// magic stuff to create a tracer
public static JaegerTracer initTracer(String service) {
SamplerConfiguration samplerConfig = SamplerConfiguration.fromEnv().withType("const").withParam(1);
ReporterConfiguration reporterConfig = ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration config = new Configuration(service).withSampler(samplerConfig).withReporter(reporterConfig);
return config.getTracer();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment