-
-
Save mccraigmccraig/bb994e23888d6a90d0071ec0ecaf4b94 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns er-model.routing.router.kafka-streams | |
(:require | |
[manifold.deferred :as deferred] | |
[er-model.connectors.kafka.serde :as k.serde] | |
[er-model.connectors.kafka.config :as cfg] | |
[er-model.routing :as routing] | |
[er-model.routing.effects :as effects] | |
[taoensso.timbre :refer [info]]) | |
(:import | |
[org.apache.kafka.streams KafkaStreams StreamsBuilder] | |
[org.apache.kafka.streams.kstream | |
Consumed Produced ValueMapper KeyValueMapper ForeachAction])) | |
(defn kafka-streams-router-app | |
[app | |
router-kafka-opts | |
{routing-topic :routing-topic | |
effects-topic :effects-topic | |
:as topics}] | |
(info "building kafka-streams-router" | |
{:router-kafka-opts router-kafka-opts | |
:topics topics}) | |
(let [props (cfg/config-properties-with-brokers router-kafka-opts) | |
builder (StreamsBuilder.)] | |
(-> builder | |
(.stream routing-topic (Consumed/with | |
(k.serde/string-serde) | |
(k.serde/edn-serde))) | |
(.flatMapValues | |
(reify ValueMapper | |
(apply [_ v] | |
;; (info "routing" v) | |
(routing/route-message app v)))) | |
(.to effects-topic (Produced/with | |
(k.serde/string-serde) | |
(k.serde/edn-serde)))) | |
(KafkaStreams. (.build builder) props))) | |
(defn kafka-streams-routing-effects-app | |
[app | |
routing-effects-kafka-opts | |
{effects-topic :effects-topic | |
:as topics}] | |
(info "building kafka-streams-routing-effects-processor" | |
{:routing-effects-kafka-opts routing-effects-kafka-opts | |
:topics topics}) | |
(let [props (cfg/config-properties-with-brokers routing-effects-kafka-opts) | |
builder (StreamsBuilder.)] | |
(-> builder | |
(.stream effects-topic (Consumed/with | |
(k.serde/string-serde) | |
(k.serde/edn-serde))) | |
(.foreach | |
(reify ForeachAction | |
(apply [_ k v] | |
;; (info "effects" v) | |
(effects/apply-effect app v))))) | |
(KafkaStreams. (.build builder) props))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment