Skip to content

Instantly share code, notes, and snippets.

@pixie79
Created November 27, 2013 15:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pixie79/7677546 to your computer and use it in GitHub Desktop.
Save pixie79/7677546 to your computer and use it in GitHub Desktop.
Exception in thread "main" java.lang.ClassCastException: Cannot cast clojure.lang.PersistentList to storm.kafka.KafkaConfig$BrokerHosts, compiling:(word_count_kafka.clj:26)
(ns storm.starter.clj.word-count-kafka
(:import ;[backtype.storm StormSubmitter LocalCluster]
[storm.kafka KafkaConfig HostPort KafkaSpout SpoutConfig StringScheme])
(:use [backtype.storm clojure config])
(:gen-class))
(def ^{:private true}
host (list "localhost:9092"))
(def ^{:private true
:doc "List of Kafka hosts"}
ZkHosts host )
(def ^{:private true
:doc "Topic Name"}
topic "bidder")
(def ^{:private true
:doc "root path of Zookeeper for the spout to store the consumer offsets"}
zkRoot "/kafkastorm")
(def ^{:private true
:doc "id for this consumer for storing the consumer offsets in zookeeper"}
id "discovery")
(def ^{:private true
:doc "kafka spout config definition"}
spout-config (SpoutConfig. ZkHosts topic zkRoot id))
(def kafka-spout
(KafkaSpout. spout-config
)
)
(defbolt split-sentence ["word"] [tuple collector]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)
))
(defbolt word-count ["word" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [word (.getString tuple 0)]
(swap! counts (partial merge-with +) {word 1})
(emit-bolt! collector [word (@counts word)] :anchor tuple)
(ack! collector tuple)
)))))
(defn mk-topology []
(topology
{"1" (spout-spec kafka-spout)
{"2" (bolt-spec {"1" :shuffle "2" :shuffle}
split-sentence
:p 5)
"3" (bolt-spec {"3" ["word"]}
word-count
:p 6)}))
(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "word-count-kafka" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
(defn submit-topology! [name]
(StormSubmitter/submitTopology
name
{TOPOLOGY-DEBUG true
TOPOLOGY-WORKERS 3}
(mk-topology)))
(defn -main
([]
(run-local!))
([name]
(submit-topology! name)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment