Skip to content

Instantly share code, notes, and snippets.

@mfenniak
Created February 7, 2017 14:24
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mfenniak/04f9c0bea8a1a2e0a747d678117df9f7 to your computer and use it in GitHub Desktop.
Save mfenniak/04f9c0bea8a1a2e0a747d678117df9f7 to your computer and use it in GitHub Desktop.
fun printGraphviz(builder: TopologyBuilder) {
println("digraph {")
val nodeGroups = builder.nodeGroups()
val processorTopologys = nodeGroups.map { kv -> builder.build(kv.key) }
val sourceTopics = processorTopologys.flatMap { it.sourceTopics() }
val sinkTopics = processorTopologys.flatMap { it.sinkTopics() }
val allTopics = sourceTopics.plus(sinkTopics).distinct().sorted()
println(" node [shape=\"rect\"]")
for (topic in allTopics) {
println(" \"topic-$topic\" [label=\"Topic $topic\"]")
}
for (processorTopology in processorTopologys.withIndex()) {
printProcessor(processorTopology.index, processorTopology.value)
}
println("}")
}
val sinkNodeTopicField: Field by lazy {
val klass = SinkNode::class.java
val topicField = klass.getDeclaredField("topic")
topicField.isAccessible = true
topicField
}
fun printProcessor(nodeGroup: Int, processorTopology: ProcessorTopology) {
println(" subgraph t$nodeGroup {")
println(" # processor nodes")
println(" node [shape=\"ellipse\"]")
for (processor in processorTopology.processors()) {
println(" \"processor-${processor.name()}\" [label=\"Processor ${processor.name()}\"]")
}
println(" # source nodes")
for (sourceTopic in processorTopology.sourceTopics()) {
val sourceNode = processorTopology.source(sourceTopic)
println(" \"topic-$sourceTopic\" -> \"processor-${sourceNode.name()}\"")
}
println(" # processors")
for (processor in processorTopology.processors()) {
if (processor is SinkNode<*, *>) {
// note: can't use processorTopology's sinkTopics() & sink(...) methods, because a single processor
// topology can have multiple sinks for one topic, but the ProcessorTopology API doesn't allow you
// to retrieve all the sinks. It just returns a single random SinkNode.
val sinkTopic = sinkNodeTopicField.get(processor) as String
println(" \"processor-${processor.name()}\" -> \"topic-$sinkTopic\"")
}
/*
SourceNode's topics field are not correct; they are not applicationId prefixed when they should be.
if (processor is SourceNode<*, *>) {
val sourceTopics = sourceNodeTopicsField.get(processor) as Array<String>
for (sourceTopic in sourceTopics) {
println(" \"topic-$sourceTopic\" -> \"processor-${processor.name()}\"")
}
}
*/
for (child in processor.children()) {
if (child is ProcessorNode<*, *>) {
println(" \"processor-${processor.name()}\" -> \"processor-${child.name()}\"")
}
}
}
println(" }")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment