Skip to content

Instantly share code, notes, and snippets.

@timmc
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timmc/2989d79f5cb8504089d9 to your computer and use it in GitHub Desktop.
Save timmc/2989d79f5cb8504089d9 to your computer and use it in GitHub Desktop.
Haven't tested this. Beware.
package rx.operators;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.ReplaySubject;
/**
* An observable that caches the results (like {@link Observable#cache()})
* and additionally provides a way to find out whether various events have
* yet occurred on the shared source subscription.
* <p>
* The same warnings apply as with {@link OperationCache}.
*/
public abstract class PeekableCacheObservable<T> extends Observable<T> {
/**
* Create a cached Observable based on a source. The source will only
* be subscribed to once this Observable is subscribed to, and only once.
*/
public static <T> PeekableCacheObservable<T> create(final Observable<? extends T> source) {
final AtomicBoolean subscribeTripped = new AtomicBoolean(false);
final AtomicBoolean onNextTripped = new AtomicBoolean(false);
final AtomicBoolean onErrorTripped = new AtomicBoolean(false);
final AtomicBoolean onCompleteTripped = new AtomicBoolean(false);
final ReplaySubject<T> cache = ReplaySubject.create();
OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
if (subscribeTripped.compareAndSet(false, true)) {
// Set *one* observer for the source
source.subscribe(cache);
// Set flags once various events have occurred
cache.subscribe(new Observer<T>() {
public void onNext(T args) { onNextTripped.set(true); }
public void onError(Throwable e) { onErrorTripped.set(true); }
public void onCompleted() { onCompleteTripped.set(true); }
});
}
// Set any number of observers on the cache
return cache.subscribe(observer);
}
};
return new PeekableCacheObservable<T>(onSubscribe) {
public boolean hasSubscribed() { return subscribeTripped.get(); }
public boolean hasNextTripped() { return onNextTripped.get(); }
public boolean hasErrorTripped() { return onErrorTripped.get(); }
public boolean hasCompleteTripped() { return onCompleteTripped.get(); };
};
}
PeekableCacheObservable(OnSubscribeFunc<T> onSubscribe) {
super(onSubscribe);
}
/**
* Return true iff a subscription has been started on the source.
*/
public abstract boolean hasSubscribed();
/**
* Return true iff at least one onNext event has fired after the subscription started.
*/
public abstract boolean hasNextTripped();
/**
* Return true iff at least one onError event has fired after the subscription started.
*/
public abstract boolean hasErrorTripped();
/**
* Return true iff at least one onComplete event has fired after the subscription started.
*/
public abstract boolean hasCompleteTripped();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment