Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created November 2, 2018 18:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mccraigmccraig/bb994e23888d6a90d0071ec0ecaf4b94 to your computer and use it in GitHub Desktop.
Save mccraigmccraig/bb994e23888d6a90d0071ec0ecaf4b94 to your computer and use it in GitHub Desktop.
(ns er-model.routing.router.kafka-streams
(:require
[manifold.deferred :as deferred]
[er-model.connectors.kafka.serde :as k.serde]
[er-model.connectors.kafka.config :as cfg]
[er-model.routing :as routing]
[er-model.routing.effects :as effects]
[taoensso.timbre :refer [info]])
(:import
[org.apache.kafka.streams KafkaStreams StreamsBuilder]
[org.apache.kafka.streams.kstream
Consumed Produced ValueMapper KeyValueMapper ForeachAction]))
(defn kafka-streams-router-app
[app
router-kafka-opts
{routing-topic :routing-topic
effects-topic :effects-topic
:as topics}]
(info "building kafka-streams-router"
{:router-kafka-opts router-kafka-opts
:topics topics})
(let [props (cfg/config-properties-with-brokers router-kafka-opts)
builder (StreamsBuilder.)]
(-> builder
(.stream routing-topic (Consumed/with
(k.serde/string-serde)
(k.serde/edn-serde)))
(.flatMapValues
(reify ValueMapper
(apply [_ v]
;; (info "routing" v)
(routing/route-message app v))))
(.to effects-topic (Produced/with
(k.serde/string-serde)
(k.serde/edn-serde))))
(KafkaStreams. (.build builder) props)))
(defn kafka-streams-routing-effects-app
[app
routing-effects-kafka-opts
{effects-topic :effects-topic
:as topics}]
(info "building kafka-streams-routing-effects-processor"
{:routing-effects-kafka-opts routing-effects-kafka-opts
:topics topics})
(let [props (cfg/config-properties-with-brokers routing-effects-kafka-opts)
builder (StreamsBuilder.)]
(-> builder
(.stream effects-topic (Consumed/with
(k.serde/string-serde)
(k.serde/edn-serde)))
(.foreach
(reify ForeachAction
(apply [_ k v]
;; (info "effects" v)
(effects/apply-effect app v)))))
(KafkaStreams. (.build builder) props)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment