Skip to content

Instantly share code, notes, and snippets.

@tristanstraub
Created May 27, 2019 23:30
Show Gist options
  • Save tristanstraub/1c0fc048f83a7e6b6f223d532b5afee7 to your computer and use it in GitHub Desktop.
Save tristanstraub/1c0fc048f83a7e6b6f223d532b5afee7 to your computer and use it in GitHub Desktop.
#!/bin/sh
#_(
#_DEPS is same format as deps.edn. Multiline is okay.
DEPS='{:deps {aysylu/loom {:mvn/version "1.0.2"}
org.apache.kafka/kafka-streams {:mvn/version "2.2.0"}}}'
#_You can put other options here
OPTS='-J-Xms256m -J-Xmx256m -J-client'
exec clojure $OPTS -Sdeps "$DEPS" "$0" "$@"
)
(require '[loom.graph :as graph]
'[loom.io :as loom.io])
(import '(org.apache.kafka.streams TopologyDescription$Source TopologyDescription$Sink)
'(org.apache.kafka.streams.kstream ValueMapper)
'(org.apache.kafka.streams StreamsBuilder))
(defn topology
[in out]
(let [builder (StreamsBuilder.)
stream (.stream builder in)]
(.mapValues stream (reify ValueMapper
(apply [_ v]
v)))
(.build builder)))
(defn topology-description
[topology]
(let [description (.describe topology)]
{:global-stores (for [global-store (.globalStores description)]
{:processor {:stores (.stores (.processor global-store))}
:source {:topics (.topics (.source global-store))}})
:subtopologies (for [subtopology (.subtopologies description)]
{:id (.id subtopology)
:nodes (for [node (.nodes subtopology)]
(merge {:name (.name node)
:predecessors (map #(.name %) (.predecessors node))
:successors (map #(.name %) (.successors node))}
(cond (instance? TopologyDescription$Source node)
{:topics (.topics node)}
(instance? TopologyDescription$Sink node)
{:topic (.topic node)}
(instance? TopologyDescription$Sink node)
{:stores (.stores node)})))})}))
(defn index-by
[k coll]
(reduce (fn [m v]
(assoc m (k v) v))
{}
coll))
(defn view
[topology]
(let [description (topology-description topology)]
(loom.io/view (reduce (fn [graph subtopology]
(let [node (:nodes subtopology)
idx (index-by :name node)]
(reduce (fn [graph node]
(let [graph (graph/add-nodes graph (idx (:name node)))
graph (reduce (fn [graph predecessor]
(graph/add-edges graph [(idx predecessor)
(idx (:name node))]))
graph
(:predecessors node))]
graph))
graph
(:nodes subtopology))))
(graph/digraph)
(:subtopologies description)))))
(view (topology "in" "out"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment