Last active
December 19, 2016 18:19
-
-
Save timmc/86f6e7a606ec1f69928cfbe32401c40e to your computer and use it in GitHub Desktop.
Short-circuiting reduce with RxJava
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require '[rx.lang.clojure.core :as rx]) | |
(defn o-sc-reduce | |
"Perform a short-circuiting reduce on a series of Observables. | |
Reduces the series of Observables using the reduction function `f`, | |
starting the accumulator with `init`. Inputs to the reduction are | |
provided by subscribing to the input Observables serially, similar to | |
concat. If at any point (including the initial value) the accumulator | |
satisfies `final?`, emit the accumulator, complete, and do not | |
subscribe to any more of the inputs. Any error encountered similarly | |
short-circuits. | |
Note that `f` accepts values from the input Observables, not the | |
Observables themselves." | |
[f init final? & observables] | |
(rx/observable* | |
(fn start-combiner [subscriber] | |
(let [state (atom {:streams observables | |
:accum init | |
:done? false})] | |
(letfn [;; Complete normally -- call next and completed, mark done. | |
(emit-answer [] | |
(when-not (rx/unsubscribed? subscriber) | |
(rx/on-next subscriber (:accum @state)) | |
(rx/on-completed subscriber)) | |
(swap! state assoc :done? true)) | |
;; Complete exceptionally with Throwable t, mark done. | |
(emit-error [^Throwable t] | |
(when-not (rx/unsubscribed? subscriber) | |
(rx/on-error subscriber t)) | |
(swap! state assoc :done? true)) | |
;; Each of the three event handlers checks the value of | |
;; `done?` before doing any processing, in order to enforce | |
;; the short-circuiting. | |
(recv-next [incoming] | |
(when-not (:done? @state) | |
(try | |
(let [result (f (:accum @state) incoming)] | |
(swap! state assoc :accum result) | |
(when (final? result) | |
;; Short-circuit (general case) | |
(emit-answer))) | |
(catch Throwable t | |
(emit-error t))))) | |
(recv-error [t] | |
(when-not (:done? @state) | |
(emit-error t))) | |
(recv-completed [] | |
(when-not (:done? @state) | |
;; We haven't short-circuited, so move on | |
;; to the next stream | |
(step))) | |
;; Step into next stream | |
(step [] | |
(try | |
(if-let [^Observable stream (first (:streams @state))] | |
;; Subscribe to the next stream and process events | |
;; from it; when it completes, `recv-completed` | |
;; will call `step` again. | |
(do | |
(swap! state update-in [:streams] next) | |
(rx/subscribe stream | |
recv-next recv-error recv-completed)) | |
;; Reached the end, no more streams; return the | |
;; accumulator. | |
(emit-answer)) | |
(catch Throwable t | |
(emit-error t))))] | |
;; Kick off the evaluation by subscribing to the first stream | |
;; (or finish right away if there are no streams or initial | |
;; accumulator already final.) | |
(try | |
(if (final? (:accum @state)) | |
(emit-answer) | |
(step)) | |
(catch Throwable t | |
(emit-error t)))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment