Skip to content

Instantly share code, notes, and snippets.

@nathanmarz
Created September 20, 2011 04:01
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save nathanmarz/1228302 to your computer and use it in GitHub Desktop.
Save nathanmarz/1228302 to your computer and use it in GitHub Desktop.
Example of defining a topology in Clojure
(use 'backtype.storm.clojure)
(use 'backtype.storm.config)
(require '[backtype.storm [thrift :as thrift]])
(import 'storm.starter.spout.RandomSentenceSpout)
(import 'backtype.storm.LocalCluster)
(defboltfull suffix-bolt ["word"]
:params [suffix]
:let [conf-state (atom nil)]
:prepare ([conf context collector]
(reset! conf-state conf))
:execute ([tuple collector]
(.emit collector tuple [(str (.getValue tuple 0) suffix)])
(.ack collector tuple)
))
(defbolt exclamation-bolt ["word"] [tuple collector]
(.emit collector tuple [(str (.getValue tuple 0) "!!!")])
(.ack collector tuple))
(defn mk-topology []
(thrift/mk-topology
{1 (thrift/mk-spout-spec (RandomSentenceSpout.) :parallelism-hint 4)}
{2 (thrift/mk-bolt-spec {1 :shuffle} (suffix-bolt "!!!") :parallelism-hint 3)
3 (thrift/mk-bolt-spec {2 :shuffle} exclamation-bolt :parallelism-hint 3)}
))
(def cluster (LocalCluster.))
(.submitTopology cluster "test" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment