Skip to content

Instantly share code, notes, and snippets.

@sherpc
Last active April 24, 2017 07:12
Show Gist options
  • Save sherpc/2829d6a5481e92e28afbf92404d50264 to your computer and use it in GitHub Desktop.
Save sherpc/2829d6a5481e92e28afbf92404d50264 to your computer and use it in GitHub Desktop.
Kafka streams proof of concept
(ns flock-staff.kafka
(:require [cheshire.core :as json])
(:import [flock_staff.kafka StringProcessor StringForeachAction]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.streams.processor TopologyBuilder ProcessorSupplier]
[org.apache.kafka.streams KafkaStreams StreamsConfig]
[org.apache.kafka.clients.consumer ConsumerConfig]
[java.util Properties]))
(defn processor
[handle-fn]
(let [counter (atom 0)]
(proxy [StringProcessor] []
(init [context] (println "INIT" this (.getId (Thread/currentThread))))
(process [k v]
(swap! counter inc)
(handle-fn (json/decode v keyword)))
(punctuate [timestamp]
(println timestamp))
(close [] (println @counter "CLOSE" (.getId (Thread/currentThread)))))))
(defn ->properties
[m]
(let [result (Properties.)]
(doseq [[k v] m]
(.put result k v))
result))
(comment
;; prn here is fn that will be call on each message in kafka stream "profiles"
(let [p (processor prn)
ps (reify ProcessorSupplier
(get [_] p))
string-serde (-> (Serdes/String)
.getClass
.getName)
builder (->
(TopologyBuilder.)
(.addSource "Source" (into-array String ["profiles"]))
(.addProcessor "Process" ps (into-array String ["Source"]))
)
props (->properties
{StreamsConfig/APPLICATION_ID_CONFIG "streams-echo"
StreamsConfig/BOOTSTRAP_SERVERS_CONFIG "10.19.0.200:9092,10.19.0.97:9092,10.19.0.213:9092"
StreamsConfig/ZOOKEEPER_CONNECT_CONFIG "10.19.0.135:2181"
StreamsConfig/KEY_SERDE_CLASS_CONFIG string-serde
StreamsConfig/VALUE_SERDE_CLASS_CONFIG string-serde
;; ConsumerConfig/AUTO_OFFSET_RESET_CONFIG "earliest"
})
streams (KafkaStreams. builder props)]
(.start streams)
(Thread/sleep 30000)
(.close streams)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment