Skip to content

Instantly share code, notes, and snippets.

@bobymicroby
Created June 19, 2016 09:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bobymicroby/4a861012dc892b0090a129e29f3ba11e to your computer and use it in GitHub Desktop.
Save bobymicroby/4a861012dc892b0090a129e29f3ba11e to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
public class OnSubscribeTTLCache<T> implements Observable.OnSubscribe<T>
{
private final Observable<T> source;
private final long ttl;
private final Func0<Long> clock;
private volatile Observable<T> current;
private volatile long nextExpirationTime;
public static <T> Observable<T> cache(Observable<T> source, long ttl, Func0<Long> clock)
{
return Observable.create(new OnSubscribeTTLCache<>(source, ttl, clock));
}
public OnSubscribeTTLCache(Observable<T> source, long ttl, Func0<Long> clock)
{
this.ttl = ttl;
this.source = source;
this.clock = clock;
this.current = source;
this.nextExpirationTime = Long.MIN_VALUE;
}
@Override
public void call(Subscriber<? super T> subscriber)
{
final long currentTime = clock.call();
if (currentTime > nextExpirationTime)
{
nextExpirationTime = currentTime + ttl;
current = source.cacheWithInitialCapacity(1);
}
current.unsafeSubscribe(subscriber);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment