Skip to content

Instantly share code, notes, and snippets.

@samhandev
Created March 6, 2017 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samhandev/fb2c5ac78e9afab14c64513d3a29eda2 to your computer and use it in GitHub Desktop.
Save samhandev/fb2c5ac78e9afab14c64513d3a29eda2 to your computer and use it in GitHub Desktop.
Based on https://github.com/jameswalton/kafka-streams with explicit creation of objects
(ns kafka-streams-test.core
(:import [org.apache.kafka.streams.kstream KStreamBuilder ValueMapper]
[org.apache.kafka.streams KafkaStreams StreamsConfig]
[org.apache.kafka.common.serialization Serdes])
(:gen-class))
(def props
{StreamsConfig/APPLICATION_ID_CONFIG "dictionary-words-count"
StreamsConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9092"
StreamsConfig/KEY_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/String)))
StreamsConfig/VALUE_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/String)))})
(def input-topic
(into-array String ["my-input-topic"]))
(defn dofunc-and-print [f v]
(println (str "The value is " v))
(f v))
(def transform-fn
(fn [v]
(str "The sent message is: " v)))
(defn -main [& args]
(let [config (StreamsConfig. props)
builder (KStreamBuilder.)
source (->
(.stream builder input-topic)
(.mapValues (reify ValueMapper (apply [_ v] (dofunc-and-print transform-fn v))))
(.to "my-output-topic")
)
streams (KafkaStreams. builder config)
]
(prn "starting")
(.start streams)
(Thread/sleep 6000)
(.close streams)
(prn "stopping")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment