Skip to content

Instantly share code, notes, and snippets.

Created January 13, 2012 19:45
Show Gist options
  • Save anonymous/1608336 to your computer and use it in GitHub Desktop.
Save anonymous/1608336 to your computer and use it in GitHub Desktop.
Reliable word-count topology
(ns topologies.core
(:import [backtype.storm StormSubmitter LocalCluster])
(:require [clojure.test :as test])
(:use [backtype.storm clojure config])
(:gen-class))
(def id-count (atom 0)) ;; tuple counter for debugging -- something to make ids out of
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple []
(Thread/sleep 10)
(let [sentence (rand-nth sentences)
id (swap! id-count inc)]
;;(spit "/tmp/sentence-spout" (str "sentence-spout emitting tuple " id ": " sentence "\n") :append true)
(emit-spout! collector [sentence] :id id)
))
(ack [id]
(spit "/tmp/sentence-spout-ack" (str "acked: " id "\n") :append true))
(fail [id]
(spit "/tmp/sentence-spout-fail" (str "failed: " id "\n") :append true)))))
(defbolt split-sentence ["word"] [tuple collector]
(let [sentence (.getString tuple 0)
words (.split sentence " ")]
;(spit "/tmp/split-sentence" (str "split-sentence called with: " sentence "\n") :append true)
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)
))
(defbolt word-count ["word" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [word (.getString tuple 0)]
;(spit "/tmp/word-count" (str "word-count called with word: " word "\n") :append true)
(swap! counts (partial merge-with +) {word 1})
;(emit-bolt! collector [word (@counts word)] :anchor tuple)
(ack! collector tuple)
)))))
(defn mk-topology []
(topology
{"1" (spout-spec sentence-spout)}
{"3" (bolt-spec {"1" :shuffle}
split-sentence
:p 5)
"4" (bolt-spec {"3" ["word"]}
word-count
:p 6)}))
(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
(defn -main [name]
(StormSubmitter/submitTopology
name
{TOPOLOGY-DEBUG false
TOPOLOGY-WORKERS 3}
(mk-topology)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment