Skip to content

Instantly share code, notes, and snippets.

@SeanTAllen
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SeanTAllen/48fabdd93ff465329898 to your computer and use it in GitHub Desktop.
Save SeanTAllen/48fabdd93ff465329898 to your computer and use it in GitHub Desktop.
Having an "ummm" moments with this Storm code
;; max-buffer-size which is a configuration option is no longer used as of 0.9.2 but is still in method signature. bug?
(defn- mk-receive-thread [context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-id]
(async-loop
(fn []
(log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id " ]")
(fn []
(let [batched (ArrayList.)
^Iterator iter (.recv ^IConnection socket 0 thread-id)
closed (atom false)]
(when iter
(while (and (not @closed) (.hasNext iter))
(let [packet (.next iter)
task (if packet (.task ^TaskMessage packet))
message (if packet (.message ^TaskMessage packet))]
(if (= task -1)
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
(.close socket)
(reset! closed true))
(when packet (.add batched [task message]))))))
(when (not @closed)
(do
(if (> (.size batched) 0)
(transfer-local-fn batched))
0)))))
:factory? true
:daemon daemon
:kill-fn kill-fn
:priority priority
:thread-name (str "worker-receiver-thread-" thread-id)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment