Created
August 27, 2013 15:21
-
-
Save daveray/6355008 to your computer and use it in GitHub Desktop.
chunk a sequence of observables
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn chunk | |
"Same as rx.Observable.merge(Observable<Observable<T>>) but the input Observables | |
are \"chunked\" so that at most chunk-size of them are \"in flight\" at any given | |
time. | |
The order of the input Observables is not preserved. | |
The main purpose here is to allow a large number of Hystrix observables to | |
be processed in a controlled way so that the Hystrix execution queues aren't | |
overwhelmed. | |
Example: | |
(->> users | |
(map #(-> (GetUserCommand. %) .toObservable)) | |
(chunk 10)) | |
See: | |
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#merge(rx.Observable) | |
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#mergeDelayError(rx.Observable) | |
" | |
([chunk-size observable-source] (chunk chunk-size {} observable-source)) | |
([chunk-size options observable-source] | |
(let [new-state-atom #(atom {:in-flight #{} ; observables currently in-flight | |
:buffered [] ; observables waiting to be emitted | |
:complete false ; true if observable-source is complete | |
:observer % }) ; the observer | |
ps #(do (printf "%s/%d/%d%n" | |
(:complete %) | |
(-> % :buffered count) | |
(-> % :in-flight count)) | |
(flush)) | |
next-state (fn [{:keys [complete buffered in-flight] :as old}] | |
(cond | |
(empty? buffered) [(or complete nil) old] | |
(< (count in-flight) chunk-size) (let [next-o (first buffered)] | |
[next-o | |
(-> old | |
(update-in [:buffered] next) | |
(update-in [:in-flight] conj next-o))]) | |
:else [nil old])) | |
advance! (fn advance! [state-atom] | |
(let [old-state @state-atom | |
[todo new-state] (next-state old-state)] | |
(if (compare-and-set! state-atom old-state new-state) | |
(let [observer (:observer new-state)] | |
(if (:debug options) (ps new-state)) | |
(cond | |
(= :complete todo) | |
(rx/on-completed observer) | |
(instance? Throwable todo) | |
(rx/on-error observer todo) | |
(instance? rx.Observable todo) | |
(rx/on-next observer | |
(.finallyDo ^rx.Observable todo | |
(reify rx.util.functions.Action0 | |
(call [this] | |
(swap! state-atom update-in [:in-flight] disj todo) | |
(advance! state-atom))))))) | |
(recur state-atom)))) | |
subscribe (fn [state-atom] | |
(rx/subscribe observable-source | |
(fn [o] | |
(swap! state-atom update-in [:buffered] conj o) | |
(advance! state-atom)) | |
(fn [e] | |
(swap! state-atom assoc :complete e) | |
(advance! state-atom)) | |
(fn [] | |
(swap! state-atom assoc :complete :complete) | |
(advance! state-atom)))) | |
observable (rx/fn->o (fn [observer] | |
(subscribe (new-state-atom observer)))) ] | |
(if (:delay-error? options) | |
(rx.Observable/mergeDelayError observable) | |
(rx.Observable/merge observable))))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment