Created
March 4, 2015 04:34
-
-
Save rlac/8beff5ac4ed2fafd48d2 to your computer and use it in GitHub Desktop.
Observable stream switcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Abstraction over {@link Observable#switchOnNext(Observable)}, allowing an Observable to publish | |
* results from multiple streams that may be switched over time, for example from an event source | |
* that may change. | |
* | |
* https://stackoverflow.com/questions/21836818/subscribing-to-a-future-observable/21839376#21839376 | |
*/ | |
public final class StreamSwitcher<T> { | |
Subject<Observable<T>, Observable<T>> publisher; | |
Observable<T> stream; | |
public static <T> StreamSwitcher<T> createBehaviorStreamSwitcher() { | |
return new StreamSwitcher<>(BehaviorSubject.<T>create()); | |
} | |
public static <T> StreamSwitcher<T> createPublishStreamSwitcher() { | |
return new StreamSwitcher<>(PublishSubject.<T>create()); | |
} | |
private StreamSwitcher(Subject subject) { | |
publisher = subject; | |
stream = Observable.switchOnNext(publisher); | |
} | |
/** | |
* @return the Observable to subscribe to. | |
*/ | |
public Observable<T> getStream() { | |
return stream; | |
} | |
/** | |
* @param newStream the new underlying Observable to stream from. | |
*/ | |
public void switchStream(Observable<T> newStream) { | |
publisher.onNext(newStream); | |
} | |
/** | |
* Suspends the Observable. No further items will be emitted until a the next call to | |
* {@link #switchStream(Observable)}. | |
*/ | |
public void suspendStream() { | |
publisher.onNext(Observable.<T>never()); | |
} | |
/** | |
* Stops the stream. No items will be emitted after this call, and calls to {@link | |
* #switchStream(Observable)} will have no effect. | |
*/ | |
public void stopStream() { | |
publisher.onNext(Observable.<T>empty()); | |
publisher.onCompleted(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment