Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created October 23, 2018 01:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mccraigmccraig/37f8f89a5e414eb75dd8140759980dd0 to your computer and use it in GitHub Desktop.
Save mccraigmccraig/37f8f89a5e414eb75dd8140759980dd0 to your computer and use it in GitHub Desktop.
(defn kafka-streams-router-builder
[app
{routing-topic :routing-topic
effects-topic :effects-topic
:as router-opts}]
(let [builder (StreamsBuilder.)]
(-> builder
(.stream routing-topic (Consumed/with
(k.serde/string-serde)
(k.serde/edn-serde)))
(.flatMapValues
(reify ValueMapper
(apply [_ v]
;; (info "routing" v)
(routing/route-message app v))))
(.through effects-topic (Produced/with
(k.serde/string-serde)
(k.serde/edn-serde)))
(.foreach
(reify ForeachAction
(apply [_ k v]
;; (info "effects" v)
(effects/apply-effect app v)))))
builder))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment