Skip to content

Instantly share code, notes, and snippets.

Created Sep 25, 2014
What would you like to do?
(ns example.main
(:use [backtype.storm clojure config log]))
(defbolt split-sentence ["word"]
[tuple collector]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)))
(defbolt word-count ["word" "count"]
{:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt (execute
(let [word (.getString tuple 0)]
(swap! counts
(partial merge-with +) {word 1})
(emit-bolt! collector
[word (@counts word)]
:anchor tuple)
(ack! collector tuple))))))
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences
["a little brown dog" "the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(Thread/sleep 100)
;(log-message "DEBUG>>>>>>>>>>" (rand-nth sentences))
(emit-spout! collector
[(rand-nth sentences)]))
;; You only need to define this method for reliable spouts
;; (such as one that reads off of a queue like Kestrel)
;; This is an unreliable spout, so it does nothing here
(defn -main
[& args]
(let [conf {TOPOLOGY-DEBUG true, TOPOLOGY-MAX-SPOUT-PENDING 1, "logFile" (first args)}
topos (topology
{"1" (spout-spec sentence-spout)}
{"3" (bolt-spec
{"1" :shuffle}
:p 5)
"4" (bolt-spec
{"3" ["word"]}
:p 6)})
cluster (local-cluster)
_ (.submitTopology cluster "APIv2-Monitor-Topology" conf topos)]
(Thread/sleep 10000)
(.shutdown cluster)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment