Created
August 7, 2015 19:43
-
-
Save akarnokd/a2669817708827350663 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static class SubscriptionProducer<S, T> implements Producer { | |
@SuppressWarnings("rawtypes") | |
private static final AtomicLongFieldUpdater<SubscriptionProducer> REQUEST_COUNT = AtomicLongFieldUpdater.newUpdater(SubscriptionProducer.class, "requestCount"); | |
private volatile long requestCount = 0l; | |
private boolean hasTerminated = false; | |
private S state; | |
private final Subscriber<? super T> facadeSubscriber; | |
private final Subscriber<? super T> actualSubscriber; | |
private SyncOnSubscribe<S, T> parent; | |
private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubscribe<S, T> parent, S state) { | |
this.actualSubscriber = subscriber; | |
this.parent = parent; | |
this.state = state; | |
this.facadeSubscriber = wrapSubscriber(subscriber); | |
actualSubscriber.add(createTerminalSubscription(parent)); | |
} | |
private Subscription createTerminalSubscription(final SyncOnSubscribe<S, T> parent) { | |
return Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
if (REQUEST_COUNT.get(SubscriptionProducer.this) == 0) | |
// we are processing the request loop and state may be concurrently modified | |
// execute onUnsub after evaluating next function | |
// PENDING_UNSUBSCRIBE.set(SubscriptionProducer.this, 1); | |
// else | |
// it's safe to process terminal behavior | |
cleanup(); | |
} | |
}); | |
} | |
private Subscriber<T> wrapSubscriber(final Subscriber<? super T> subscriber) { | |
return new Subscriber<T>(subscriber) { | |
@Override | |
public void onCompleted() { | |
if (hasTerminated) { | |
throw new IllegalStateException("Terminal event already emitted."); | |
} | |
hasTerminated = true; | |
if (!actualSubscriber.isUnsubscribed()) { | |
actualSubscriber.onCompleted(); | |
} | |
cleanup(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
if (hasTerminated) { | |
throw new IllegalStateException("Terminal event already emitted."); | |
} | |
hasTerminated = true; | |
if (!actualSubscriber.isUnsubscribed()) { | |
actualSubscriber.onError(e); | |
} | |
cleanup(); | |
} | |
@Override | |
public void onNext(T value) { | |
actualSubscriber.onNext(value); | |
} | |
}; | |
} | |
@Override | |
public void request(long n) { | |
if (n > 0 && BackpressureUtils.getAndAddRequest(REQUEST_COUNT, this, n) == 0) { | |
try { | |
if (n == Long.MAX_VALUE) { | |
// fast-path | |
final SyncOnSubscribe<S, T> p = parent; | |
final Subscriber<? super T> fs = facadeSubscriber; | |
Subscriber<? super T> a = actualSubscriber; | |
S st = state; | |
if (a.isUnsubscribed()) { | |
cleanup(); | |
return; | |
} | |
while (true) { | |
st = p.next(st, fs); | |
if (hasTerminated) { | |
return; | |
} else { | |
if (a.isUnsubscribed()) { | |
cleanup(); | |
return; | |
} | |
} | |
} | |
} else { | |
long r = 0; | |
do { | |
r = requestCount; | |
long numToEmit = r; | |
do { | |
sentCount = 0; | |
state = parent.next(state, facadeSubscriber); | |
if (hasTerminated) { | |
return; | |
} else if (actualSubscriber.isUnsubscribed()) { | |
cleanup(); | |
return; | |
} | |
numToEmit -= sentCount; | |
} while(numToEmit > 0); | |
} while(REQUEST_COUNT.addAndGet(this, -r) > 0); | |
} | |
} catch (Throwable t) { | |
if (!hasTerminated) { | |
hasTerminated = true; | |
if (!actualSubscriber.isUnsubscribed()) { | |
actualSubscriber.onError(t); | |
} | |
cleanup(); | |
} | |
} | |
} | |
} | |
private void cleanup() { | |
parent.onUnsubscribe(state); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment