Skip to content

Instantly share code, notes, and snippets.

@gjcourt
Created August 30, 2012 18:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gjcourt/3536950 to your computer and use it in GitHub Desktop.
Save gjcourt/3536950 to your computer and use it in GitHub Desktop.
(defbolt split-category ["category" "event"]
[tuple collector]
(let [event (.getString tuple 0)
category "testing_category2"]
(comment
(emit-bolt! collector [category event] :anchor tuple)
)
(emit-bolt! collector [category event] :anchor tuple :stream "3")
(ack! collector tuple)))
; TODO ensure that we upload on time, even if no events are sent
; TODO don't upload empty files (have to test how to figure this out).
(defbolt buffer-and-upload [] {:prepare true}
[conf context collector]
(let [tfile (atom {})
streams (atom {})
beginning (atom {})
written (ref {})]
(bolt
(execute [tuple]
(let [category (.getString tuple 0)
event (str (.getString tuple 1) \newline)
bytes (utf8-byte-array event)
num-bytes (alength bytes)
end (now)]
(dosync
(when-not (and
(@tfile category)
(< (in-minutes (interval (@beginning category) end)) *nearest-n-minutes*)
(< (+ (@written category) num-bytes) *file-size*))
(let [file (@tfile category)
new-file (temp-file ".gz")
stream (@streams category)
dest (s3-loc category end *nearest-n-minutes* ".gz")]
(swap! tfile assoc category new-file)
(swap! streams assoc category (gzip-stream new-file))
(swap! beginning assoc category (now))
(alter written assoc category 0)
(when (and file stream)
(.finish stream)
(.close stream)
(upload-file category file dest)
))))
(let [stream (@streams category)]
(.write stream bytes)
(dosync
(commute written (partial merge-with +) {category num-bytes})))
(ack! collector tuple)
)))))
(defbolt noop [] [tuple collector] (ack! collector tuple))
(defn mk-topology
[]
(topology
{"1" (spout-spec spout/kafka-spout :p 1)}
{"2" (bolt-spec {"1" :shuffle}
split-category
:p 1)
"3" (bolt-spec {"2" ["category"]}
buffer-and-upload
:p 1)}
))
(defn run-local!
[]
(let [cluster (LocalCluster.)]
(.submitTopology cluster "events-test" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep (* 1 60 1000))
(.shutdown cluster)))
(defn submit-topology! [name]
(StormSubmitter/submitTopology
name
{TOPOLOGY-WORKERS 2
TOPOLOGY-DEBUG true}
(mk-topology)))
(defn -main
([]
(run-local!))
([name]
(submit-topology! name)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment