Skip to content

Instantly share code, notes, and snippets.

@halfelf
Created October 2, 2013 06:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save halfelf/6789945 to your computer and use it in GitHub Desktop.
Save halfelf/6789945 to your computer and use it in GitHub Desktop.
Simple Kafka Consumer in Clojure
(ns oceanus.anduin.clj.consumer
(:gen-class)
(:import [kafka.consumer ConsumerConfig Consumer KafkaStream]
[kafka.javaapi.consumer ConsumerConnector]
[java.util Properties]))
(defn make-props
"convert a clojure map into a Properties object."
[m]
(let [props (Properties.)]
(doseq [[k v] m]
(.put props k (str v)))
props))
(defn get-streams-map [conf topics]
(-> conf make-props ConsumerConfig.
Consumer/createJavaConsumerConnector
(.createMessageStreams topics)))
(defn get-streams [props topic total-partitions]
(for [i (range total-partitions)]
(-> props
(get-streams-map {topic (int 1)})
(.get topic)
first)))
(defn get-one-stream [props topic]
(-> props
(get-streams-map {topic (int 1)})
(.get topic)
first))
(defn count-events [props topic total-partitions]
"example of usage"
(let [counter (atom 0)]
(doseq [stream (get-streams props topic total-partitions)]
(.start
(Thread.
#(doseq [m stream]
(swap! counter inc)))))
(Thread/sleep 15000)
@counter))
(comment
{"zookeeper.connect" "localhost:2181"
"zk.connectiontimeout.ms" 1000000
"group.id" "group1",
"fetch.size" 2097152,
"socket.receive.buffer.bytes" 65536,
"auto.commit.interval.ms" 60000,
"queued.max.messages" 10}
) ; A property example
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment