Last active
August 29, 2015 14:04
-
-
Save timmc/2989d79f5cb8504089d9 to your computer and use it in GitHub Desktop.
Haven't tested this. Beware.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rx.operators; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.subjects.ReplaySubject; | |
/** | |
* An observable that caches the results (like {@link Observable#cache()}) | |
* and additionally provides a way to find out whether various events have | |
* yet occurred on the shared source subscription. | |
* <p> | |
* The same warnings apply as with {@link OperationCache}. | |
*/ | |
public abstract class PeekableCacheObservable<T> extends Observable<T> { | |
/** | |
* Create a cached Observable based on a source. The source will only | |
* be subscribed to once this Observable is subscribed to, and only once. | |
*/ | |
public static <T> PeekableCacheObservable<T> create(final Observable<? extends T> source) { | |
final AtomicBoolean subscribeTripped = new AtomicBoolean(false); | |
final AtomicBoolean onNextTripped = new AtomicBoolean(false); | |
final AtomicBoolean onErrorTripped = new AtomicBoolean(false); | |
final AtomicBoolean onCompleteTripped = new AtomicBoolean(false); | |
final ReplaySubject<T> cache = ReplaySubject.create(); | |
OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() { | |
@Override | |
public Subscription onSubscribe(Observer<? super T> observer) { | |
if (subscribeTripped.compareAndSet(false, true)) { | |
// Set *one* observer for the source | |
source.subscribe(cache); | |
// Set flags once various events have occurred | |
cache.subscribe(new Observer<T>() { | |
public void onNext(T args) { onNextTripped.set(true); } | |
public void onError(Throwable e) { onErrorTripped.set(true); } | |
public void onCompleted() { onCompleteTripped.set(true); } | |
}); | |
} | |
// Set any number of observers on the cache | |
return cache.subscribe(observer); | |
} | |
}; | |
return new PeekableCacheObservable<T>(onSubscribe) { | |
public boolean hasSubscribed() { return subscribeTripped.get(); } | |
public boolean hasNextTripped() { return onNextTripped.get(); } | |
public boolean hasErrorTripped() { return onErrorTripped.get(); } | |
public boolean hasCompleteTripped() { return onCompleteTripped.get(); }; | |
}; | |
} | |
PeekableCacheObservable(OnSubscribeFunc<T> onSubscribe) { | |
super(onSubscribe); | |
} | |
/** | |
* Return true iff a subscription has been started on the source. | |
*/ | |
public abstract boolean hasSubscribed(); | |
/** | |
* Return true iff at least one onNext event has fired after the subscription started. | |
*/ | |
public abstract boolean hasNextTripped(); | |
/** | |
* Return true iff at least one onError event has fired after the subscription started. | |
*/ | |
public abstract boolean hasErrorTripped(); | |
/** | |
* Return true iff at least one onComplete event has fired after the subscription started. | |
*/ | |
public abstract boolean hasCompleteTripped(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment