Skip to content

Instantly share code, notes, and snippets.

@timmc
Last active December 19, 2016 18:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timmc/86f6e7a606ec1f69928cfbe32401c40e to your computer and use it in GitHub Desktop.
Save timmc/86f6e7a606ec1f69928cfbe32401c40e to your computer and use it in GitHub Desktop.
Short-circuiting reduce with RxJava
(require '[rx.lang.clojure.core :as rx])
(defn o-sc-reduce
"Perform a short-circuiting reduce on a series of Observables.
Reduces the series of Observables using the reduction function `f`,
starting the accumulator with `init`. Inputs to the reduction are
provided by subscribing to the input Observables serially, similar to
concat. If at any point (including the initial value) the accumulator
satisfies `final?`, emit the accumulator, complete, and do not
subscribe to any more of the inputs. Any error encountered similarly
short-circuits.
Note that `f` accepts values from the input Observables, not the
Observables themselves."
[f init final? & observables]
(rx/observable*
(fn start-combiner [subscriber]
(let [state (atom {:streams observables
:accum init
:done? false})]
(letfn [;; Complete normally -- call next and completed, mark done.
(emit-answer []
(when-not (rx/unsubscribed? subscriber)
(rx/on-next subscriber (:accum @state))
(rx/on-completed subscriber))
(swap! state assoc :done? true))
;; Complete exceptionally with Throwable t, mark done.
(emit-error [^Throwable t]
(when-not (rx/unsubscribed? subscriber)
(rx/on-error subscriber t))
(swap! state assoc :done? true))
;; Each of the three event handlers checks the value of
;; `done?` before doing any processing, in order to enforce
;; the short-circuiting.
(recv-next [incoming]
(when-not (:done? @state)
(try
(let [result (f (:accum @state) incoming)]
(swap! state assoc :accum result)
(when (final? result)
;; Short-circuit (general case)
(emit-answer)))
(catch Throwable t
(emit-error t)))))
(recv-error [t]
(when-not (:done? @state)
(emit-error t)))
(recv-completed []
(when-not (:done? @state)
;; We haven't short-circuited, so move on
;; to the next stream
(step)))
;; Step into next stream
(step []
(try
(if-let [^Observable stream (first (:streams @state))]
;; Subscribe to the next stream and process events
;; from it; when it completes, `recv-completed`
;; will call `step` again.
(do
(swap! state update-in [:streams] next)
(rx/subscribe stream
recv-next recv-error recv-completed))
;; Reached the end, no more streams; return the
;; accumulator.
(emit-answer))
(catch Throwable t
(emit-error t))))]
;; Kick off the evaluation by subscribing to the first stream
;; (or finish right away if there are no streams or initial
;; accumulator already final.)
(try
(if (final? (:accum @state))
(emit-answer)
(step))
(catch Throwable t
(emit-error t))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment