Created
May 27, 2019 23:30
-
-
Save tristanstraub/1c0fc048f83a7e6b6f223d532b5afee7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
#_( | |
#_DEPS is same format as deps.edn. Multiline is okay. | |
DEPS='{:deps {aysylu/loom {:mvn/version "1.0.2"} | |
org.apache.kafka/kafka-streams {:mvn/version "2.2.0"}}}' | |
#_You can put other options here | |
OPTS='-J-Xms256m -J-Xmx256m -J-client' | |
exec clojure $OPTS -Sdeps "$DEPS" "$0" "$@" | |
) | |
(require '[loom.graph :as graph] | |
'[loom.io :as loom.io]) | |
(import '(org.apache.kafka.streams TopologyDescription$Source TopologyDescription$Sink) | |
'(org.apache.kafka.streams.kstream ValueMapper) | |
'(org.apache.kafka.streams StreamsBuilder)) | |
(defn topology | |
[in out] | |
(let [builder (StreamsBuilder.) | |
stream (.stream builder in)] | |
(.mapValues stream (reify ValueMapper | |
(apply [_ v] | |
v))) | |
(.build builder))) | |
(defn topology-description | |
[topology] | |
(let [description (.describe topology)] | |
{:global-stores (for [global-store (.globalStores description)] | |
{:processor {:stores (.stores (.processor global-store))} | |
:source {:topics (.topics (.source global-store))}}) | |
:subtopologies (for [subtopology (.subtopologies description)] | |
{:id (.id subtopology) | |
:nodes (for [node (.nodes subtopology)] | |
(merge {:name (.name node) | |
:predecessors (map #(.name %) (.predecessors node)) | |
:successors (map #(.name %) (.successors node))} | |
(cond (instance? TopologyDescription$Source node) | |
{:topics (.topics node)} | |
(instance? TopologyDescription$Sink node) | |
{:topic (.topic node)} | |
(instance? TopologyDescription$Sink node) | |
{:stores (.stores node)})))})})) | |
(defn index-by | |
[k coll] | |
(reduce (fn [m v] | |
(assoc m (k v) v)) | |
{} | |
coll)) | |
(defn view | |
[topology] | |
(let [description (topology-description topology)] | |
(loom.io/view (reduce (fn [graph subtopology] | |
(let [node (:nodes subtopology) | |
idx (index-by :name node)] | |
(reduce (fn [graph node] | |
(let [graph (graph/add-nodes graph (idx (:name node))) | |
graph (reduce (fn [graph predecessor] | |
(graph/add-edges graph [(idx predecessor) | |
(idx (:name node))])) | |
graph | |
(:predecessors node))] | |
graph)) | |
graph | |
(:nodes subtopology)))) | |
(graph/digraph) | |
(:subtopologies description))))) | |
(view (topology "in" "out")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment