Created
September 25, 2014 23:31
-
-
Save soasme/3f45ec3f3456583cd257 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns example.main | |
(:use [backtype.storm clojure config log])) | |
(defbolt split-sentence ["word"] | |
[tuple collector] | |
(let [words (.split (.getString tuple 0) " ")] | |
(doseq [w words] | |
(emit-bolt! collector [w] :anchor tuple)) | |
(ack! collector tuple))) | |
(defbolt word-count ["word" "count"] | |
{:prepare true} | |
[conf context collector] | |
(let [counts (atom {})] | |
(bolt (execute | |
[tuple] | |
(let [word (.getString tuple 0)] | |
(swap! counts | |
(partial merge-with +) {word 1}) | |
(emit-bolt! collector | |
[word (@counts word)] | |
:anchor tuple) | |
(ack! collector tuple)))))) | |
(defspout sentence-spout ["sentence"] | |
[conf context collector] | |
(let [sentences | |
["a little brown dog" "the man petted the dog" | |
"four score and seven years ago" | |
"an apple a day keeps the doctor away"]] | |
(spout | |
(nextTuple | |
[] | |
(Thread/sleep 100) | |
;(log-message "DEBUG>>>>>>>>>>" (rand-nth sentences)) | |
(emit-spout! collector | |
[(rand-nth sentences)])) | |
(ack | |
[id] | |
;; You only need to define this method for reliable spouts | |
;; (such as one that reads off of a queue like Kestrel) | |
;; This is an unreliable spout, so it does nothing here | |
)))) | |
(defn -main | |
[& args] | |
(let [conf {TOPOLOGY-DEBUG true, TOPOLOGY-MAX-SPOUT-PENDING 1, "logFile" (first args)} | |
topos (topology | |
{"1" (spout-spec sentence-spout)} | |
{"3" (bolt-spec | |
{"1" :shuffle} | |
split-sentence | |
:p 5) | |
"4" (bolt-spec | |
{"3" ["word"]} | |
word-count | |
:p 6)}) | |
cluster (local-cluster) | |
_ (.submitTopology cluster "APIv2-Monitor-Topology" conf topos)] | |
(Thread/sleep 10000) | |
(.shutdown cluster))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment