Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created August 7, 2015 19:43
Show Gist options
  • Save akarnokd/a2669817708827350663 to your computer and use it in GitHub Desktop.
Save akarnokd/a2669817708827350663 to your computer and use it in GitHub Desktop.
private static class SubscriptionProducer<S, T> implements Producer {
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<SubscriptionProducer> REQUEST_COUNT = AtomicLongFieldUpdater.newUpdater(SubscriptionProducer.class, "requestCount");
private volatile long requestCount = 0l;
private boolean hasTerminated = false;
private S state;
private final Subscriber<? super T> facadeSubscriber;
private final Subscriber<? super T> actualSubscriber;
private SyncOnSubscribe<S, T> parent;
private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubscribe<S, T> parent, S state) {
this.actualSubscriber = subscriber;
this.parent = parent;
this.state = state;
this.facadeSubscriber = wrapSubscriber(subscriber);
actualSubscriber.add(createTerminalSubscription(parent));
}
private Subscription createTerminalSubscription(final SyncOnSubscribe<S, T> parent) {
return Subscriptions.create(new Action0() {
@Override
public void call() {
if (REQUEST_COUNT.get(SubscriptionProducer.this) == 0)
// we are processing the request loop and state may be concurrently modified
// execute onUnsub after evaluating next function
// PENDING_UNSUBSCRIBE.set(SubscriptionProducer.this, 1);
// else
// it's safe to process terminal behavior
cleanup();
}
});
}
private Subscriber<T> wrapSubscriber(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
if (hasTerminated) {
throw new IllegalStateException("Terminal event already emitted.");
}
hasTerminated = true;
if (!actualSubscriber.isUnsubscribed()) {
actualSubscriber.onCompleted();
}
cleanup();
}
@Override
public void onError(Throwable e) {
if (hasTerminated) {
throw new IllegalStateException("Terminal event already emitted.");
}
hasTerminated = true;
if (!actualSubscriber.isUnsubscribed()) {
actualSubscriber.onError(e);
}
cleanup();
}
@Override
public void onNext(T value) {
actualSubscriber.onNext(value);
}
};
}
@Override
public void request(long n) {
if (n > 0 && BackpressureUtils.getAndAddRequest(REQUEST_COUNT, this, n) == 0) {
try {
if (n == Long.MAX_VALUE) {
// fast-path
final SyncOnSubscribe<S, T> p = parent;
final Subscriber<? super T> fs = facadeSubscriber;
Subscriber<? super T> a = actualSubscriber;
S st = state;
if (a.isUnsubscribed()) {
cleanup();
return;
}
while (true) {
st = p.next(st, fs);
if (hasTerminated) {
return;
} else {
if (a.isUnsubscribed()) {
cleanup();
return;
}
}
}
} else {
long r = 0;
do {
r = requestCount;
long numToEmit = r;
do {
sentCount = 0;
state = parent.next(state, facadeSubscriber);
if (hasTerminated) {
return;
} else if (actualSubscriber.isUnsubscribed()) {
cleanup();
return;
}
numToEmit -= sentCount;
} while(numToEmit > 0);
} while(REQUEST_COUNT.addAndGet(this, -r) > 0);
}
} catch (Throwable t) {
if (!hasTerminated) {
hasTerminated = true;
if (!actualSubscriber.isUnsubscribed()) {
actualSubscriber.onError(t);
}
cleanup();
}
}
}
}
private void cleanup() {
parent.onUnsubscribe(state);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment