Skip to content

Instantly share code, notes, and snippets.

@bfabry
Created January 11, 2017 20:39
Show Gist options
  • Save bfabry/d297dd39739a38ee220033debbb19ab6 to your computer and use it in GitHub Desktop.
Save bfabry/d297dd39739a38ee220033debbb19ab6 to your computer and use it in GitHub Desktop.
(ns kafka
(:require [clojure.core.async :as async :refer [<!! >!! thread]]
[com.stuartsierra.component :as component]
[clojure.tools.logging :as log])
(:import (kafka.consumer ConsumerTimeoutException)
(org.I0Itec.zkclient.exception ZkTimeoutException)
(kafka.javaapi.consumer ZookeeperConsumerConnector)
(java.util Iterator)
(kafka.message MessageAndMetadata)))
(defn event->map [^MessageAndMetadata event]
{:key (String. ^"[B" (.key event))
:message (String. ^"[B" (.message event))})
(defn process-event [send-channel raw-event]
(let [event (event->map raw-event)]
(>!! send-channel event)))
(defn message-stream [consumer-map topic]
(first (.get consumer-map topic)))
(defn connect-consumer [config]
(let [topic-count-map {(:topic config) (int 1)}
connector (ZookeeperConsumerConnector. (:config config))
consumer-map (.createMessageStreams connector topic-count-map)
iterator (.iterator (message-stream consumer-map (:topic config)))]
[connector iterator]))
(defn consume [^Iterator iterator send-channel keep-running?]
(try
(while (and (.hasNext iterator) @keep-running?)
(let [item (.next iterator)]
(process-event send-channel item)))
(catch ConsumerTimeoutException e
(log/debug "Caught: " e))))
(defn sleep [duration-ms]
(Thread/sleep duration-ms))
(defn shutdown-connector [connector]
(.shutdown connector))
(defn try-consuming [send-channel config keep-running?]
(try
(let [[connector iterator] (connect-consumer config)]
(consume iterator send-channel keep-running?)
(shutdown-connector connector)
true)
(catch ZkTimeoutException e
(log/error "Caught: " e)
false)))
(defn run-loop [send-channel config keep-running?]
(loop [wait-time-ms 5000]
(when (and @keep-running? (< wait-time-ms 500000))
(let [consume-successful? (try-consuming send-channel config keep-running?)
new-wait-time-ms (if consume-successful?
5000
(do
(log/debug "sleep for "wait-time-ms)
(sleep wait-time-ms)
(* 2 wait-time-ms)))]
(recur new-wait-time-ms)))))
(defn shutdown! [keep-running? runner-thread]
(reset! keep-running? false)
(<!! runner-thread))
(defrecord KafkaConsumer [processor config]
component/Lifecycle
(start [component]
(log/info "KafkaConsumer starting up")
(let [send-channel (:event-channel processor)
keep-running? (atom true)
runner-thread (thread (run-loop send-channel config keep-running?))]
(assoc component
:send-channel send-channel
:config config
:keep-running? keep-running?
:runner-thread runner-thread)))
(stop [component]
(shutdown! (:keep-running? component) (:runner-thread component))
(log/info "KafkaConsumer shutdown")
(dissoc component
:send-channel
:config
:keep-running?
:runner-thread)))
(defn kafka-consumer []
(map->KafkaConsumer {}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment