Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example of defining a topology in Clojure
(use 'backtype.storm.clojure)
(use 'backtype.storm.config)
(require '[backtype.storm [thrift :as thrift]])
(import 'storm.starter.spout.RandomSentenceSpout)
(import 'backtype.storm.LocalCluster)
(defboltfull suffix-bolt ["word"]
:params [suffix]
:let [conf-state (atom nil)]
:prepare ([conf context collector]
(reset! conf-state conf))
:execute ([tuple collector]
(.emit collector tuple [(str (.getValue tuple 0) suffix)])
(.ack collector tuple)
))
(defbolt exclamation-bolt ["word"] [tuple collector]
(.emit collector tuple [(str (.getValue tuple 0) "!!!")])
(.ack collector tuple))
(defn mk-topology []
(thrift/mk-topology
{1 (thrift/mk-spout-spec (RandomSentenceSpout.) :parallelism-hint 4)}
{2 (thrift/mk-bolt-spec {1 :shuffle} (suffix-bolt "!!!") :parallelism-hint 3)
3 (thrift/mk-bolt-spec {2 :shuffle} exclamation-bolt :parallelism-hint 3)}
))
(def cluster (LocalCluster.))
(.submitTopology cluster "test" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.