Skip to content

Instantly share code, notes, and snippets.

@evant
Last active May 14, 2017 17:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save evant/7976ed19db6c31ac2a9c814ff8b12d3a to your computer and use it in GitHub Desktop.
Save evant/7976ed19db6c31ac2a9c814ff8b12d3a to your computer and use it in GitHub Desktop.
A single cache that allows evicting the value or error. Subscriptions after the value is evicted will re-trigger the upstream observer.
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package me.tatarka.rxcache2;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* A single cache that allows evicting the value or error. Subscriptions after the value is evicted will re-trigger the
* upstream observer.
*/
public class EvictableSingleCache<T> extends Single<T> implements SingleObserver<T>, CompletableObserver {
@SuppressWarnings("rawtypes")
static final CacheDisposable[] EMPTY = new CacheDisposable[0];
@SuppressWarnings("rawtypes")
static final CacheDisposable[] TERMINATED = new CacheDisposable[0];
final SingleSource<? extends T> source;
final BiFunction<? super T, Throwable, Completable> evictionFunction;
final AtomicInteger wip;
final AtomicReference<CacheDisposable<T>[]> observers;
T value;
Throwable error;
/**
* Constructs a new cached observable that is never evicted. This is equivalent to {@link Single#cache()}
*/
public EvictableSingleCache(Single<T> source) {
this(source, Completable.never());
}
/**
* Constructs a new cached observable that's evicted when the given completable completes. The eviction completable
* will be subscribed to when the source emits a value or error.
*/
public EvictableSingleCache(Single<T> source, final Completable eviction) {
this(source, new BiFunction<T, Throwable, Completable>() {
@Override
public Completable apply(T t, Throwable throwable) throws Exception {
return eviction;
}
});
}
/**
* Constructs a new cached observable that's evicted when the given completable completes. The success completable
* will be subscribed to if the source emits a value, and the error completable will be subscribed to if the source
* emits an error.
*/
public EvictableSingleCache(Single<T> source, final Completable successEviction, final Completable errorEviction) {
this(source, new BiFunction<T, Throwable, Completable>() {
@Override
public Completable apply(T value, Throwable error) throws Exception {
return value != null ? successEviction : errorEviction;
}
});
}
/**
* Constructs a new cached observable that's evicted when the given completable completes. The success function
* will be invoked if the source emits a value, and the error function will be invoked if the source emits an
* error.
*/
public EvictableSingleCache(Single<T> source, final Function<? super T, Completable> successEvictionFunction, final Function<Throwable, Completable> errorEvictionFunction) {
this(source, new BiFunction<T, Throwable, Completable>() {
@Override
public Completable apply(T value, Throwable error) throws Exception {
return (value != null ? successEvictionFunction.apply(value) : errorEvictionFunction.apply(error));
}
});
}
/**
* Constructs a new cached observable that's evicted when the given completable completes. The function
* will be invoked when the source emits a value or error.
*/
public EvictableSingleCache(Single<T> source, BiFunction<? super T, Throwable, Completable> evictionFunction) {
this.source = source;
this.wip = new AtomicInteger();
this.evictionFunction = evictionFunction;
this.observers = new AtomicReference<CacheDisposable<T>[]>(EMPTY);
}
@Override
protected void subscribeActual(SingleObserver<? super T> s) {
CacheDisposable<T> d = new CacheDisposable<>(s, this);
s.onSubscribe(d);
if (add(d)) {
if (d.isDisposed()) {
remove(d);
}
} else {
Throwable ex = error;
if (ex != null) {
s.onError(ex);
} else {
s.onSuccess(value);
}
return;
}
if (wip.getAndIncrement() == 0) {
source.subscribe(this);
}
}
boolean add(CacheDisposable<T> observer) {
for (; ; ) {
CacheDisposable<T>[] a = observers.get();
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
CacheDisposable<T>[] b = new CacheDisposable[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = observer;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
@SuppressWarnings("unchecked")
void remove(CacheDisposable<T> observer) {
for (; ; ) {
CacheDisposable<T>[] a = observers.get();
int n = a.length;
if (n == 0) {
return;
}
int j = -1;
for (int i = 0; i < n; i++) {
if (a[i] == observer) {
j = i;
break;
}
}
if (j < 0) {
return;
}
CacheDisposable<T>[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new CacheDisposable[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (observers.compareAndSet(a, b)) {
return;
}
}
}
@Override
public void onSubscribe(Disposable d) {
// not supported by this operator
}
@Override
public void onSuccess(T value) {
this.value = value;
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) {
if (!d.isDisposed()) {
d.actual.onSuccess(value);
}
}
try {
Completable eviction = evictionFunction.apply(value, null);
if (eviction != Completable.never()) {
eviction.subscribe(this);
}
} catch (Exception ex) {
throw Exceptions.propagate(ex);
}
}
@Override
public void onError(Throwable e) {
this.error = e;
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) {
if (!d.isDisposed()) {
d.actual.onError(e);
}
}
try {
Completable eviction = evictionFunction.apply(null, e);
if (eviction != Completable.never()) {
eviction.subscribe(this);
}
} catch (Exception ex) {
throw Exceptions.propagate(ex);
}
}
@Override
public void onComplete() {
observers.set(EMPTY);
wip.set(0);
}
static final class CacheDisposable<T>
extends AtomicBoolean
implements Disposable {
private static final long serialVersionUID = 7514387411091976596L;
final SingleObserver<? super T> actual;
final EvictableSingleCache<T> parent;
CacheDisposable(SingleObserver<? super T> actual, EvictableSingleCache<T> parent) {
this.actual = actual;
this.parent = parent;
}
@Override
public boolean isDisposed() {
return get();
}
@Override
public void dispose() {
if (compareAndSet(false, true)) {
parent.remove(this);
}
}
}
}
@evant
Copy link
Author

evant commented May 14, 2017

Examples:

Cache value for 10 minutes.

new EvictableSingle(source, Completable.timer(10, TimeUnit.MINUTES))

Don't cache errors.

new EvictableSingle(source, Completable.never(), Completable.complete())

Clear cache on button click.

new EvictableSingle(source, RxView.clicks(button).firstOrError().toCompletable())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment