Skip to content

Instantly share code, notes, and snippets.

@kakai248
Created February 6, 2018 11:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kakai248/592d8526b601e02286c531de772482c0 to your computer and use it in GitHub Desktop.
Save kakai248/592d8526b601e02286c531de772482c0 to your computer and use it in GitHub Desktop.
package me.mesmo.app.utils.cache;
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Single;
import io.reactivex.internal.functions.ObjectHelper;
/**
* Simulates a BehaviorRelay but with a timed value. This value expires after a certain time has
* passed and will no longer be emitted to the subscribers.
* Also supports manual invalidation.
*
* @param <T> The type to hold on this relay
*/
public class TimedBehaviorRelay<T> extends Relay<T> {
private static final int DEFAULT_TIMEOUT_MS = 300000; // 5 minutes
private final long timeout;
private final TimeUnit unit;
private final AtomicReference<TimedValue<T>> valueRef = new AtomicReference<>();
// We hold the value ourselves so we don't need an actual BehaviorRelay, just a Publish.
private final PublishRelay<T> relay = PublishRelay.create();
public static <T> TimedBehaviorRelay<T> create() {
return new TimedBehaviorRelay<>(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
public static <T> TimedBehaviorRelay<T> createWithTimeout(long timeout, TimeUnit unit) {
return new TimedBehaviorRelay<>(timeout, unit);
}
private TimedBehaviorRelay(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = ObjectHelper.requireNonNull(unit, "unit is null");
}
public void invalidate() {
valueRef.set(null);
}
public boolean isValid() {
TimedValue<T> value = valueRef.get();
return value != null && insideTimeWindow(value.time);
}
/**
* Observes this source but first runs the completable if the cache is invalid.
*
* @param completable The completable to run
* @return A subscription to this observable
*/
public Observable<T> observeStartWith(Completable completable) {
if (!isValid()) {
return completable.andThen(this);
}
return this;
}
/**
* Observes this source but first runs the single and caches it's value if the cache is invalid.
*
* @param single The single to run and cache
* @return A subscription to this observable
*/
public Observable<T> observeAndCache(Single<T> single) {
if (!isValid()) {
return single.doOnSuccess(this).toCompletable().andThen(this);
}
return this;
}
@Override
public void accept(T value) {
valueRef.set(new TimedValue<>(value, System.currentTimeMillis()));
relay.accept(value);
}
@Override
public boolean hasObservers() {
return relay.hasObservers();
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
relay.subscribeActual(observer);
// Only emit upon subscription if the value is still valid
if (isValid()) {
relay.accept(valueRef.get().value);
}
}
private boolean insideTimeWindow(long time) {
long now = System.currentTimeMillis();
return now - time < unit.toMillis(timeout);
}
private static class TimedValue<T> {
private final T value;
private final long time;
TimedValue(T value, long time) {
this.value = value;
this.time = time;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment