Skip to content

Instantly share code, notes, and snippets.

@rlac
Created March 4, 2015 04:34
Show Gist options
  • Save rlac/8beff5ac4ed2fafd48d2 to your computer and use it in GitHub Desktop.
Save rlac/8beff5ac4ed2fafd48d2 to your computer and use it in GitHub Desktop.
Observable stream switcher
/**
* Abstraction over {@link Observable#switchOnNext(Observable)}, allowing an Observable to publish
* results from multiple streams that may be switched over time, for example from an event source
* that may change.
*
* https://stackoverflow.com/questions/21836818/subscribing-to-a-future-observable/21839376#21839376
*/
public final class StreamSwitcher<T> {
Subject<Observable<T>, Observable<T>> publisher;
Observable<T> stream;
public static <T> StreamSwitcher<T> createBehaviorStreamSwitcher() {
return new StreamSwitcher<>(BehaviorSubject.<T>create());
}
public static <T> StreamSwitcher<T> createPublishStreamSwitcher() {
return new StreamSwitcher<>(PublishSubject.<T>create());
}
private StreamSwitcher(Subject subject) {
publisher = subject;
stream = Observable.switchOnNext(publisher);
}
/**
* @return the Observable to subscribe to.
*/
public Observable<T> getStream() {
return stream;
}
/**
* @param newStream the new underlying Observable to stream from.
*/
public void switchStream(Observable<T> newStream) {
publisher.onNext(newStream);
}
/**
* Suspends the Observable. No further items will be emitted until a the next call to
* {@link #switchStream(Observable)}.
*/
public void suspendStream() {
publisher.onNext(Observable.<T>never());
}
/**
* Stops the stream. No items will be emitted after this call, and calls to {@link
* #switchStream(Observable)} will have no effect.
*/
public void stopStream() {
publisher.onNext(Observable.<T>empty());
publisher.onCompleted();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment