Skip to content

Instantly share code, notes, and snippets.

@daveray
Created August 27, 2013 15:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daveray/6355008 to your computer and use it in GitHub Desktop.
Save daveray/6355008 to your computer and use it in GitHub Desktop.
chunk a sequence of observables
(defn chunk
"Same as rx.Observable.merge(Observable<Observable<T>>) but the input Observables
are \"chunked\" so that at most chunk-size of them are \"in flight\" at any given
time.
The order of the input Observables is not preserved.
The main purpose here is to allow a large number of Hystrix observables to
be processed in a controlled way so that the Hystrix execution queues aren't
overwhelmed.
Example:
(->> users
(map #(-> (GetUserCommand. %) .toObservable))
(chunk 10))
See:
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#merge(rx.Observable)
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#mergeDelayError(rx.Observable)
"
([chunk-size observable-source] (chunk chunk-size {} observable-source))
([chunk-size options observable-source]
(let [new-state-atom #(atom {:in-flight #{} ; observables currently in-flight
:buffered [] ; observables waiting to be emitted
:complete false ; true if observable-source is complete
:observer % }) ; the observer
ps #(do (printf "%s/%d/%d%n"
(:complete %)
(-> % :buffered count)
(-> % :in-flight count))
(flush))
next-state (fn [{:keys [complete buffered in-flight] :as old}]
(cond
(empty? buffered) [(or complete nil) old]
(< (count in-flight) chunk-size) (let [next-o (first buffered)]
[next-o
(-> old
(update-in [:buffered] next)
(update-in [:in-flight] conj next-o))])
:else [nil old]))
advance! (fn advance! [state-atom]
(let [old-state @state-atom
[todo new-state] (next-state old-state)]
(if (compare-and-set! state-atom old-state new-state)
(let [observer (:observer new-state)]
(if (:debug options) (ps new-state))
(cond
(= :complete todo)
(rx/on-completed observer)
(instance? Throwable todo)
(rx/on-error observer todo)
(instance? rx.Observable todo)
(rx/on-next observer
(.finallyDo ^rx.Observable todo
(reify rx.util.functions.Action0
(call [this]
(swap! state-atom update-in [:in-flight] disj todo)
(advance! state-atom)))))))
(recur state-atom))))
subscribe (fn [state-atom]
(rx/subscribe observable-source
(fn [o]
(swap! state-atom update-in [:buffered] conj o)
(advance! state-atom))
(fn [e]
(swap! state-atom assoc :complete e)
(advance! state-atom))
(fn []
(swap! state-atom assoc :complete :complete)
(advance! state-atom))))
observable (rx/fn->o (fn [observer]
(subscribe (new-state-atom observer)))) ]
(if (:delay-error? options)
(rx.Observable/mergeDelayError observable)
(rx.Observable/merge observable)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment