Skip to content

Instantly share code, notes, and snippets.

@halfelf
Created March 31, 2014 08:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save halfelf/9887429 to your computer and use it in GitHub Desktop.
Save halfelf/9887429 to your computer and use it in GitHub Desktop.
Reliable word count example for storm in Clojure. Note the `ack` and `fail` parts in spout.
(ns storm.starter.clj.word-count2
(:import [backtype.storm StormSubmitter LocalCluster])
(:use [backtype.storm clojure config log])
(:gen-class))
(def id-count (atom 0)) ;; tuple counter for debugging -- something to make ids out of
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple []
(Thread/sleep 10)
(let [sentence (rand-nth sentences)
id (swap! id-count inc)]
(emit-spout! collector [sentence] :id id)
))
(ack [id]
(log-message "Acking " id))
(fail [id]
(log-message "Failing " id)))))
(defbolt split-sentence ["word"] [tuple collector]
(let [sentence (.getString tuple 0)
words (.split sentence " ")]
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)
))
(defbolt word-count ["word" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [word (.getString tuple 0)]
(swap! counts (partial merge-with +) {word 1})
;(emit-bolt! collector [word (@counts word)] :anchor tuple)
(ack! collector tuple)
)))))
(defn mk-topology []
(topology
{"1" (spout-spec sentence-spout)}
{"3" (bolt-spec {"1" :shuffle}
split-sentence
:p 5)
"4" (bolt-spec {"3" ["word"]}
word-count
:p 6)}))
(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
(defn -main [name]
(StormSubmitter/submitTopology
name
{TOPOLOGY-DEBUG false
TOPOLOGY-WORKERS 3}
(mk-topology)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment