(:import [ InputStream OutputStream FileInputStream FileOutputStream File])
(:import [java.util List Map])
(:import [ FileUtils IOUtils])
(:use [backtype.storm log config util])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.nimbus INimbusStorage]))
(defn create-local-storage [conf]
(let [stormroot (nimbus-storage-local-dir conf)]
(log-message "Using default storage (" stormroot ")")
(reify INimbusStorage
(^InputStream open [this, ^String path]
(FileInputStream. (join-paths stormroot path)))
(^OutputStream create [this, ^String path]
(FileOutputStream. (join-paths stormroot path)))
(^List list [this, ^String path]
(seq (.list (File. (join-paths stormroot path)))))
(^void delete [this, ^String path]
(let [full-path (join-paths stormroot path)]
(when (exists-file? full-path)
(FileUtils/forceDelete (File. full-path)))))
(^void mkdirs [this, ^String path]
(FileUtils/forceMkdir (File. (join-paths stormroot path))))
(^boolean isSupportDistributed [this]
(defn create-custom-storage [storage-name conf]
(let [storage (new-instance storage-name)]
(.init storage conf)
(log-message "Using custom storage: " storage-name)
(defn ^INimbusStorage create-nimbus-storage [conf]
(if-let [storage-name (conf NIMBUS-STORAGE)]
(create-custom-storage storage-name conf)
(create-local-storage conf)))
(defn list-full-paths [storage path]
(map #(str path "/" ^String %) (.list storage path)))
(defn upload-file-to-storage [file storage path]
(let [stream (.create storage path)]
(IOUtils/copy (FileInputStream. file) stream)
(finally (.close stream)))))
(defn ensure-clean-dir-in-storage [storage path]
(.mkdirs storage path)
(if-let [files (seq (list-full-paths storage path))]
(.delete storage files)))
(defn serialize-to-storage [obj storage path]
(let [stream (.create storage path)]
(IOUtils/write (Utils/serialize obj) stream)
(finally (.close stream)))))
(defn deserialize-from-storage [storage path]
(let [stream (.open storage path)]
(Utils/deserialize (IOUtils/toByteArray stream))
(finally (.close stream)))))
(ns backtype.storm.nimbus.elections
(:import [backtype.storm.nimbus NimbusLeaderElections])
(:use [backtype.storm config util log]))
(defn local-hostname-conf [conf]
(if (contains? conf STORM-LOCAL-HOSTNAME)
(defn get-nimbus-leader-addr [conf]
(let [leader-elections (NimbusLeaderElections.)]
(.init leader-elections conf nil)
(let [leader-addr (.getLeaderAddr leader-elections)]
(.close leader-elections)
(defn get-nimbus-addr-list [conf]
(let [leader-elections (NimbusLeaderElections.)]
(.init leader-elections conf nil)
(let [addr-list (.getNimbusHosts leader-elections)]
(.close leader-elections)
(defn await-leadership [conf storage]
(let [leader-elections (NimbusLeaderElections.)]
(when-let [leader-addr (get-nimbus-leader-addr conf)]
(log-message "Current Nimbus leader: " leader-addr)
(if-not (.isSupportDistributed storage)
(throw (IllegalStateException. "Trying to start secondary Nimbus with storage that isn't support distributed"))))
(.init leader-elections conf (str (local-hostname-conf conf) ":" (conf NIMBUS-THRIFT-PORT)))
(log-message "Nimbus awaiting for leadership")
(.awaitLeadership leader-elections)
(log-message "Nimbus gained leadership")
(defn ensure-leadership [leader-elections]
(when-not (.hasLeadership leader-elections) (throw (backtype.storm.generated.NotALeaderException.))))
