Skip to content

Instantly share code, notes, and snippets.

@joaocsousa
Created September 28, 2017 21:16
Show Gist options
  • Save joaocsousa/4d6929f8f7af5a7af44cba8581675450 to your computer and use it in GitHub Desktop.
Save joaocsousa/4d6929f8f7af5a7af44cba8581675450 to your computer and use it in GitHub Desktop.
CacheObservable that allows you to cache an emission for a certain period of time
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Observer;
import io.reactivex.functions.Consumer;
public static class CacheObservable<T> implements ObservableTransformer<T, T> {
private final long timeout;
private final TimeUnit unit;
private CacheObservable(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
public static <T> CacheObservable<T> cache(long timeout, TimeUnit unit) {
return new CacheObservable<T>(timeout, unit);
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>(timeout, unit);
return new LastSeenObservable<>(upstream.doOnNext(lastSeen), lastSeen);
}
}
static final class LastSeen<T> implements Consumer<T> {
private final long timeout;
private final TimeUnit unit;
private long lastEmissionTimestamp;
private volatile T value;
LastSeen(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
@Override
public void accept(T latest) {
lastEmissionTimestamp = System.currentTimeMillis();
value = latest;
}
boolean isValid() {
return value != null && System.currentTimeMillis() - lastEmissionTimestamp <= unit.toMillis(timeout);
}
}
static final class LastSeenObservable<T> extends Observable<T> {
private final Observable<T> upstream;
private final LastSeen<T> lastSeen;
LastSeenObservable(Observable<T> upstream, LastSeen<T> lastSeen) {
this.upstream = upstream;
this.lastSeen = lastSeen;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (lastSeen.isValid()) {
observer.onNext(lastSeen.value);
} else {
upstream.subscribe(observer);
}
}
}
Observable<Integer> observable = Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Creating observable");
return 156;
}
}).compose(CacheObservable.<Integer>cache(5, TimeUnit.SECONDS));
--------------subscribed multiple times--------------
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("New Value: " + integer);
}
});
--------------output--------------
// observable only created after timeout
00:00:28.015 Creating observable
00:00:28.015 New Value: 156
00:00:30.655 New Value: 156
00:00:32.335 New Value: 156
00:00:34.455 Creating observable
00:00:34.455 New Value: 156
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment