Skip to content

Instantly share code, notes, and snippets.

@gjcourt
Created August 30, 2012 18:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gjcourt/3536845 to your computer and use it in GitHub Desktop.
Save gjcourt/3536845 to your computer and use it in GitHub Desktop.
(ns example.storm.clj.spout.kafka-spout
(:import ; [example.storm.spout UnreliableKafkaSpout]
[storm.kafka HostPort KafkaSpout SpoutConfig StringScheme]))
(def ^:dynamic *kafka-hosts* ["kafka-1.example.net"
"kafka-2.example.net"
"kafka-3.example.net"])
(def ^:dynamic *kafka-ports* [9093
9094
9095])
(defn mk-spout-config
[kafka-hosts kafka-ports]
(let [config (SpoutConfig.
(map #(HostPort. %1 %2) kafka-hosts kafka-ports),
; kafka-hosts,
1 ; Number of partitions per host
"gjcourt-events" ; Topic to read from
"/kafkastorm" ; the root path in Zookeeper for the spout to store the consumer offsets
"storm-consumers" ; Id for this consumer for storing the consumer offsets in Zookeeper
)]
(set! (. config scheme) (StringScheme.))
; (. config forceStartOffsetTime (int -2))
config
))
(def kafka-spout
(KafkaSpout.
(mk-spout-config *kafka-hosts* *kafka-ports*)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment