Skip to content

Instantly share code, notes, and snippets.

@cddr
Created September 14, 2016 04:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cddr/48ca44a3cb0a6054735279a4d7b69731 to your computer and use it in GitHub Desktop.
Save cddr/48ca44a3cb0a6054735279a4d7b69731 to your computer and use it in GitHub Desktop.
(ns kafka-streams.embedded
"Lifecycle component for an embedded Kafka cluster
The components defined here use the component lifecycle to model
the dependencies between the various services (zookeeper, kafka broker
and kafka client).
The test-harness function returns a system containing all the relevent
components, and starting that system starts the components in the
correct order (i.e. zookeeper, kafka, test-client).
When adding a new component here, please ensure it's `start` method
*blocks* until the component is ready to accept requests from other
components"
(:require
[clojure.java.io :as io]
[clojure.string :as str]
[kafka-streams.config :as config]
[manifold.stream :as s]
[manifold.deferred :as d]
[com.stuartsierra.component :as component])
(:import
(kafka.server KafkaConfig KafkaServerStartable)
(java.net InetSocketAddress)
(org.apache.kafka.clients.producer KafkaProducer)
(org.apache.kafka.clients.consumer KafkaConsumer ConsumerRebalanceListener)
(org.apache.zookeeper.server ZooKeeperServer ServerCnxnFactory)))
(defn zookeeper-port
"Parse the zookeeper port out of a kafka server config"
[config]
(-> (get config "zookeeper.connect")
(str/split #":")
(nth 1)
read-string))
(defn tmp-dir [& parts]
(let [gen-dir (fn [root]
(apply io/file root "embedded-kafka" parts))]
(-> (System/getProperty "java.io.tmpdir")
(gen-dir)
(.getPath))))
(defrecord ZooKeeper [config zk factory result]
component/Lifecycle
(start [this]
(println "Starting zookeeper...")
(let [tick-time 500
snapshot-dir (tmp-dir "zookeeper-snapshot")
log-dir (tmp-dir "zookeeper-log")
zk (ZooKeeperServer. (io/file snapshot-dir)
(io/file log-dir)
tick-time)
result (d/deferred)
factory (doto (ServerCnxnFactory/createFactory)
(.configure (-> (zookeeper-port config)
(InetSocketAddress.)) 0))]
(.startup factory zk)
(assoc this
:zk zk
:factory factory
:result result
:thread (d/future
(.join factory)
(when (.isRunning zk)
(.shutdown zk))
@result))))
(stop [this]
(println "Stopping zookeeper...")
(try
(-> (org.apache.zookeeper.jmx.MBeanRegistry/getInstance)
(.unregisterAll))
(.shutdown factory)
(d/success! result :shutdown)
(catch Exception e
(when factory
(.shutdown factory))
(d/error! result e)))
(assoc this :zk nil, :factory nil)))
(defn zookeeper [config]
(map->ZooKeeper {:config config}))
(defrecord Kafka [zookeeper config broker result thread]
component/Lifecycle
(start [this]
(let [broker (-> (config/props config)
(KafkaConfig.)
(KafkaServerStartable.))
result (d/deferred)]
(.startup broker)
(println "Started kafka broker")
(assoc this
:broker broker
:result result
:thread (d/future @result))))
(stop [this]
(println "Stopping kafka broker")
(try
(when broker
(.shutdown broker))
(d/success! result :shutdown)
(catch Exception e
(d/error! result e))
(finally
(try
(doseq [f (file-seq (io/file (get config "log.dir")))]
(println "Kafka cleanup: " f)
(.delete f)))))
(assoc this :broker nil)))
(defn kafka [config]
(map->Kafka {:config config}))
(defn consumer-balancer [offsets consumer]
(reify
ConsumerRebalanceListener
(onPartitionsRevoked [_ partitions]
(doseq [p partitions]
(swap! offsets assoc p (.position consumer))))
(onPartitionsAssigned [_ partitions]
(doseq [p partitions]
(when-let [position (get @offsets p)]
(.seek consumer p position))))))
;; The TestClient sets up a single producer and consumer and sends all received
;; messages through the `log-stream`.
(defrecord Harness [kafka config producer consumer offsets log-stream]
component/Lifecycle
(start [this]
(println "Starting test client")
(let [props (config/props config)
producer (or producer (KafkaProducer. props))
consumer (or consumer (KafkaConsumer. props))
offsets (atom {})
log-stream (s/stream)
rebalance (consumer-balancer offsets consumer)]
(.subscribe consumer #"\*" rebalance)
(assoc this
:producer producer
:consumer consumer
:offsets offsets
:log-stream log-stream
:thread (d/future
(loop []
(let [records (.poll 100)]
(when-not (s/closed? log-stream)
(doseq [r records]
(let [message {:topic (.topic r)
:key (.key r)
:value (.value r)}]
(s/put! log-stream message)))
(recur))))))))
(stop [this]
;; these should be present but we guard against null so that we can safely
;; cleanup after ourselves after an error.
;;
;; https://github.com/stuartsierra/component#idempotence
(when producer
(.close producer))
(when consumer
(.close consumer))
(when log-stream
(s/close! log-stream)
(assoc this
:producer nil
:consumer nil
:log-stream nil))))
(defn harness [config]
(map->Harness {:config config}))
(defn harness-system [config]
(component/system-map
:zookeeper (zookeeper config)
:kafka (component/using
(kafka config)
[:zookeeper])
:client (component/using
(test-client config)
[:kafka :zookeeper])))
(def ^:dynamic *test-harness* nil)
(defmacro with-harness [[sym config] & body]
`(let [~sym (or *test-harness*
;; Tries to start the system (recursively starting all dependent systems)
;; If we encounter some error, rethrow the error after stopping the
;; components in reverse order.
;;
;; Stop methods should be written to guard against not being started up
;; correctly
(try
(-> (harness ~config)
(component/start-system))
(catch clojure.lang.ExceptionInfo e
(let [{:keys [component system]} (ex-data e)]
(component/stop-system system)
(throw e)))))]
(try
~@body
(finally
(when-not *test-harness*
(component/stop ~sym))))))
(ns kafka-streams.embedded-test
(:require
[clojure.test :refer :all]
[kafka-streams.embedded :refer :all]
[com.stuartsierra.component :as component]
[kafka-streams.embedded :as embedded])
(:import
[org.I0Itec.zkclient ZkClient]
[org.I0Itec.zkclient.exception ZkTimeoutException]))
(def test-config
{"zookeeper.connect" "localhost:2181"
"bootstrap.servers" "localhost:9092"})
(deftest test-zk-lifecycle
(let [zk-server (-> (embedded/zookeeper test-config)
(component/start))]
(try
(is (ZkClient. "localhost:2181"))
(finally
(let [stopped (component/stop zk-server)]
(is (not (:factory stopped)))
(is (not (:zk stopped)))
(is (= :shutdown @(:result stopped)))
(is (= :shutdown @(:thread stopped)))
(is (thrown? ZkTimeoutException (ZkClient. "localhost:2181" 20))))))))
(deftest test-kafka
(let [kafka-system (-> (component/system-map
:zookeeper (zookeeper test-config)
:kafka (component/using
(kafka test-config)
[:zookeeper]))
(component/start-system))]
(try
(is kafka-system)
(finally
(let [stopped (-> (component/stop kafka-system)
:kafka)]
(is (not (:broker kafka-system)))
(is (= :shutdown @(:result stopped)))
(is (= :shutdown @(:thread stopped))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment