Skip to content

Instantly share code, notes, and snippets.

@soasme
Created September 25, 2014 23:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soasme/3f45ec3f3456583cd257 to your computer and use it in GitHub Desktop.
Save soasme/3f45ec3f3456583cd257 to your computer and use it in GitHub Desktop.
(ns example.main
(:use [backtype.storm clojure config log]))
(defbolt split-sentence ["word"]
[tuple collector]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)))
(defbolt word-count ["word" "count"]
{:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt (execute
[tuple]
(let [word (.getString tuple 0)]
(swap! counts
(partial merge-with +) {word 1})
(emit-bolt! collector
[word (@counts word)]
:anchor tuple)
(ack! collector tuple))))))
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences
["a little brown dog" "the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple
[]
(Thread/sleep 100)
;(log-message "DEBUG>>>>>>>>>>" (rand-nth sentences))
(emit-spout! collector
[(rand-nth sentences)]))
(ack
[id]
;; You only need to define this method for reliable spouts
;; (such as one that reads off of a queue like Kestrel)
;; This is an unreliable spout, so it does nothing here
))))
(defn -main
[& args]
(let [conf {TOPOLOGY-DEBUG true, TOPOLOGY-MAX-SPOUT-PENDING 1, "logFile" (first args)}
topos (topology
{"1" (spout-spec sentence-spout)}
{"3" (bolt-spec
{"1" :shuffle}
split-sentence
:p 5)
"4" (bolt-spec
{"3" ["word"]}
word-count
:p 6)})
cluster (local-cluster)
_ (.submitTopology cluster "APIv2-Monitor-Topology" conf topos)]
(Thread/sleep 10000)
(.shutdown cluster)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment