Last active
August 29, 2015 14:05
-
-
Save SeanTAllen/48fabdd93ff465329898 to your computer and use it in GitHub Desktop.
Having an "ummm" moments with this Storm code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; max-buffer-size which is a configuration option is no longer used as of 0.9.2 but is still in method signature. bug? | |
(defn- mk-receive-thread [context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-id] | |
(async-loop | |
(fn [] | |
(log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id " ]") | |
(fn [] | |
(let [batched (ArrayList.) | |
^Iterator iter (.recv ^IConnection socket 0 thread-id) | |
closed (atom false)] | |
(when iter | |
(while (and (not @closed) (.hasNext iter)) | |
(let [packet (.next iter) | |
task (if packet (.task ^TaskMessage packet)) | |
message (if packet (.message ^TaskMessage packet))] | |
(if (= task -1) | |
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice") | |
(.close socket) | |
(reset! closed true)) | |
(when packet (.add batched [task message])))))) | |
(when (not @closed) | |
(do | |
(if (> (.size batched) 0) | |
(transfer-local-fn batched)) | |
0))))) | |
:factory? true | |
:daemon daemon | |
:kill-fn kill-fn | |
:priority priority | |
:thread-name (str "worker-receiver-thread-" thread-id))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment