-
-
Save jasennett/14ad32e9286a01e427b8fb7d80dc09ac to your computer and use it in GitHub Desktop.
DisposableSingleCache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
import io.reactivex.Single; | |
import io.reactivex.SingleObserver; | |
import io.reactivex.SingleSource; | |
import io.reactivex.disposables.Disposable; | |
public final class DisposableSingleCache<T> extends Single<T> implements SingleObserver<T>, Disposable | |
{ | |
@SuppressWarnings("rawtypes") | |
static final CacheDisposable[] EMPTY = new CacheDisposable[0]; | |
@SuppressWarnings("rawtypes") | |
static final CacheDisposable[] TERMINATED = new CacheDisposable[0]; | |
@SuppressWarnings("rawtypes") | |
static final CacheDisposable[] DISPOSED = new CacheDisposable[0]; | |
final SingleSource<? extends T> source; | |
final AtomicInteger wip; | |
final AtomicReference<CacheDisposable<T>[]> observers; | |
T value; | |
Throwable error; | |
private Disposable sourceDisposable; | |
private final Object disposeLock; | |
@SuppressWarnings("unchecked") | |
public DisposableSingleCache(SingleSource<? extends T> source) { | |
this.source = source; | |
this.wip = new AtomicInteger(); | |
this.observers = new AtomicReference<CacheDisposable<T>[]>(EMPTY); | |
this.disposeLock = new Object(); | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void dispose() { | |
CacheDisposable<T>[] previousObservers; | |
synchronized (disposeLock) { | |
for(;;) { | |
previousObservers = observers.get(); | |
if (previousObservers == TERMINATED) { | |
break; | |
} else { | |
if (observers.compareAndSet(previousObservers, DISPOSED)) { | |
break; | |
} | |
} | |
} | |
} | |
if (previousObservers != DISPOSED && previousObservers != TERMINATED) { | |
if (sourceDisposable != null) { | |
sourceDisposable.dispose(); | |
} | |
for (CacheDisposable<T> d : previousObservers) { | |
d.dispose(); | |
} | |
} | |
} | |
@Override | |
public boolean isDisposed() { | |
return observers.get() == DISPOSED; | |
} | |
@Override | |
protected void subscribeActual(final SingleObserver<? super T> s) { | |
CacheDisposable<T> d = new CacheDisposable<T>(s, this); | |
s.onSubscribe(d); | |
if (add(d)) { | |
if (d.isDisposed()) { | |
remove(d); | |
} | |
} else { | |
if (observers.get() == DISPOSED) { | |
s.onError(new IllegalStateException("Source disposable already disposed")); | |
} else { | |
Throwable ex = error; | |
if (ex != null) { | |
s.onError(ex); | |
} else { | |
s.onSuccess(value); | |
} | |
} | |
return; | |
} | |
if (wip.getAndIncrement() == 0) { | |
source.subscribe(this); | |
} | |
} | |
boolean add(CacheDisposable<T> observer) { | |
for (;;) { | |
CacheDisposable<T>[] a = observers.get(); | |
if (a == TERMINATED || a == DISPOSED) { | |
return false; | |
} | |
int n = a.length; | |
@SuppressWarnings("unchecked") | |
CacheDisposable<T>[] b = new CacheDisposable[n + 1]; | |
System.arraycopy(a, 0, b, 0, n); | |
b[n] = observer; | |
if (observers.compareAndSet(a, b)) { | |
return true; | |
} | |
} | |
} | |
@SuppressWarnings("unchecked") | |
void remove(CacheDisposable<T> observer) { | |
for (;;) { | |
CacheDisposable<T>[] a = observers.get(); | |
int n = a.length; | |
if (n == 0) { | |
return; | |
} | |
int j = -1; | |
for (int i = 0; i < n; i++) { | |
if (a[i] == observer) { | |
j = i; | |
break; | |
} | |
} | |
if (j < 0) { | |
return; | |
} | |
CacheDisposable<T>[] b; | |
if (n == 1) { | |
b = EMPTY; | |
} else { | |
b = new CacheDisposable[n - 1]; | |
System.arraycopy(a, 0, b, 0, j); | |
System.arraycopy(a, j + 1, b, j, n - j - 1); | |
} | |
if (observers.compareAndSet(a, b)) { | |
return; | |
} | |
} | |
} | |
@Override | |
public void onSubscribe(Disposable d) { | |
boolean isDisposed; | |
synchronized (disposeLock) { | |
isDisposed = observers.get() == DISPOSED; | |
if (!isDisposed && observers.get() != TERMINATED) { | |
sourceDisposable = d; | |
} | |
} | |
if (isDisposed) { | |
d.dispose(); | |
} | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void onSuccess(T value) { | |
this.value = value; | |
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) { | |
if (!d.isDisposed()) { | |
d.actual.onSuccess(value); | |
} | |
} | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void onError(Throwable e) { | |
this.error = e; | |
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) { | |
if (!d.isDisposed()) { | |
d.actual.onError(e); | |
} | |
} | |
} | |
static final class CacheDisposable<T> | |
extends AtomicBoolean | |
implements Disposable { | |
final SingleObserver<? super T> actual; | |
final DisposableSingleCache<T> parent; | |
CacheDisposable(SingleObserver<? super T> actual, DisposableSingleCache<T> parent) { | |
this.actual = actual; | |
this.parent = parent; | |
} | |
@Override | |
public boolean isDisposed() { | |
return get(); | |
} | |
@Override | |
public void dispose() { | |
if (compareAndSet(false, true)) { | |
parent.remove(this); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment