Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active August 29, 2015 14:26
Show Gist options
  • Save akarnokd/dffd7500981968a845d3 to your computer and use it in GitHub Desktop.
Save akarnokd/dffd7500981968a845d3 to your computer and use it in GitHub Desktop.
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observables;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.annotations.Experimental;
import rx.functions.*;
import rx.internal.operators.BackpressureUtils;
/**
* A utility class to create OnSubscribe functions which manages the back pressure concerns that
* otherwise aren't handled by {@link rx.Observable#create(OnSubscribe)
* Observable.create(OnSubscribe)}. Note that the behavior in the {@code next} argument must
* serialize all calls to the downstream subscriber.
*
* @param <S>
* the type of the user-define state used in {@link #generateState() generateState(S)} ,
* {@link #next(Object, Subscriber) next(S, Subscriber)}, and
* {@link #onUnsubscribe(Object) onUnsubscribe(S)}.
* @param <T>
* the type {@code Subscribers} that will be compatible with {@code this}.
*/
@Experimental
public abstract class SyncOnSubscribe<S, T> implements OnSubscribe<T> {
/**
* Produces an initial state value. The default implementation returns {@code null}.
*
* @return the initial state value that will be received by the {@link #next(Object, Subscriber)
* next(S, Subscriber)} function as it's first argument on it's first execution.
*/
protected S generateState() {
return null;
}
/**
* SyncOnSubscribe iterates over this function to produce values in a way compatible with
* back-pressure. Implementations may not call {@link rx.Subscriber#onNext(Object)
* subscriber.onNext(T)} concurrently. You may serialize onNext calls over more than one thread,
* but concurrent calls to {@code onNext(t)} will result in a runtime exception. Calling
* {@link rx.Subscriber#onCompleted() subscriber.onCompleted()} or
* {@link rx.Subscriber#onError(Throwable) subscriber.onError(Throwable)} will result in the
* downstream subscriber unsubscribing and executing
* {@link SyncOnSubscribe#onUnsubscribe(Object) onUnsubscribe(S)}.
*
* @param state
* the value produced by {@link #generateState()} on the first execution or the value
* returned from the previous execution of {@code next(Object, Subscriber)}
* @param subscriber
* down stream data subscriber to receive onNext, onCompleted, or onError
* @return the state value to be received by the next execution of this function
*/
protected abstract S next(S state, Subscriber<? super T> subscriber);
/**
* Executed after the downstream subscriber is unsubscribed. This method is useful to override
* to clean up the SyncOnSubscribe state. Note that this will not be executed concurrently with
* {@link #next(Object, Subscriber) next}.
*
* @param state
* the state value
*/
protected void onUnsubscribe(S state) {
}
/**
* Generates a synchronous onSubscribe function that manages back pressure requests delegating
* to your {@code next} function/closure to produce data when values are requested by the
* downstream subscriber.
*
* This overload creates a SyncOnSubscribe without an explicit clean up step.
*
* @param generator
* generates the initial state value (see {@link #generateState()})
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> SyncOnSubscribe<S, T> createWithState(Func0<S> generator, final Action2<S, Subscriber<? super T>> next) {
Func2<S, Subscriber<? super T>, S> nextFunc = new Func2<S, Subscriber<? super T>, S>() {
@Override
public S call(S state, Subscriber<? super T> subscriber) {
next.call(state, subscriber);
return state;
}
};
Action1<S> onUnsubscribe = new Action1<S>() {
@Override
public void call(S t) {
}
};
return new LambdaOnSubscribe<S, T>(generator, nextFunc, onUnsubscribe);
}
/**
* Generates a synchronous onSubscribe function that manages back pressure requests delegating
* to your {@code next} function/closure to produce data when values are requested by the
* downstream subscriber.
*
* This overload creates a SyncOnSubscribe without an explicit clean up step.
*
* @param generator
* generates the initial state value (see {@link #generateState()})
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> SyncOnSubscribe<S, T> createWithState(Func0<S> generator, final Action2<S, Subscriber<? super T>> next, final Action1<S> onUnsubscribe) {
Func2<S, Subscriber<? super T>, S> nextFunc = new Func2<S, Subscriber<? super T>, S>() {
@Override
public S call(S state, Subscriber<? super T> subscriber) {
next.call(state, subscriber);
return state;
}
};
return new LambdaOnSubscribe<S, T>(generator, nextFunc, onUnsubscribe);
}
/**
* Generates a synchronous onSubscribe function that manages back pressure requests delegating
* to your {@code next} function/closure to produce data when values are requested by the
* downstream subscriber.
*
* @param generator
* generates the initial state value (see {@link #generateState()})
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> SyncOnSubscribe<S, T> createWithMutableState(Func0<S> generator, final Func2<S, Subscriber<? super T>, S> next) {
Func2<S, Subscriber<? super T>, S> nextFunc = new Func2<S, Subscriber<? super T>, S>() {
@Override
public S call(S state, Subscriber<? super T> subscriber) {
next.call(state, subscriber);
return state;
}
};
Action1<S> onUnsubscribe = new Action1<S>() {
@Override
public void call(S t) {
}
};
return new LambdaOnSubscribe<S, T>(generator, nextFunc, onUnsubscribe);
}
/**
* Generates a synchronous onSubscribe function that manages back pressure requests delegating
* to your {@code next} function/closure to produce data when values are requested by the
* downstream subscriber.
*
* @param generator
* generates the initial state value (see {@link #generateState()})
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> SyncOnSubscribe<S, T> createWithMutableState(Func0<S> generator, Func2<? super S, ? super Subscriber<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) {
return new LambdaOnSubscribe<S, T>(generator, next, onUnsubscribe);
}
/**
* Generates a synchronous onSubscribe function that manages back pressure requests delegating
* to your {@code next} function/closure to produce data when values are requested by the
* downstream subscriber.
*
* This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state
* value.
*
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<Subscriber<? super T>> next) {
Func2<Void, Subscriber<? super T>, Void> nextFunc = new Func2<Void, Subscriber<? super T>, Void>() {
@Override
public Void call(Void state, Subscriber<? super T> subscriber) {
next.call(subscriber);
return state;
}
};
return new LambdaOnSubscribe<Void, T>(VOID_GENERATOR, nextFunc, VOID_ON_UNSUBSCRIBE);
}
/**
* Generates a synchronous onSubscribe function that manages back pressure requests delegating
* to your {@code next} function/closure to produce data when values are requested by the
* downstream subscriber.
*
* This overload creates a SyncOnSubscribe without an explicit state.
*
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<Subscriber<? super T>> next, final Action0 onUnsubscribe) {
Func0<Void> generator = new Func0<Void>() {
@Override
public Void call() {
return null;
}
};
Func2<Void, Subscriber<? super T>, Void> nextFunc = new Func2<Void, Subscriber<? super T>, Void>() {
@Override
public Void call(Void state, Subscriber<? super T> subscriber) {
next.call(subscriber);
return null;
}
};
Action1<Void> wrappedOnUnsubscribe = new Action1<Void>() {
@Override
public void call(Void t) {
onUnsubscribe.call();
}
};
return new LambdaOnSubscribe<Void, T>(generator, nextFunc, wrappedOnUnsubscribe);
}
private static final Action1<Void> VOID_ON_UNSUBSCRIBE = new Action1<Void>() {
@Override
public void call(Void t) {
}
};
private static final Func0<Void> VOID_GENERATOR = new Func0<Void>() {
@Override
public Void call() {
return null;
}
};
@Override
public final void call(final Subscriber<? super T> subscriber) {
SubscriptionProducer<S, T> sp = new SubscriptionProducer<S, T>(subscriber, this, generateState());
subscriber.add(sp);
subscriber.setProducer(sp);
}
/**
* An implementation of SyncOnSubscribe that delegates
* {@link SyncOnSubscribe#next(Object, Subscriber)}, {@link SyncOnSubscribe#generateState()},
* and {@link SyncOnSubscribe#onUnsubscribe(Object)} to provided functions/closures.
*
* @param <T>
* the type of compatible Subscribers
* @param <S>
* the type of the user-defined state
*/
private static final class LambdaOnSubscribe<S, T> extends SyncOnSubscribe<S, T> {
private final Func0<S> generator;
private final Func2<? super S, ? super Subscriber<? super T>, ? extends S> next;
private final Action1<? super S> onUnsubscribe;
private LambdaOnSubscribe(Func0<S> generator, Func2<? super S, ? super Subscriber<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) {
this.generator = generator;
this.next = next;
this.onUnsubscribe = onUnsubscribe;
}
@Override
protected S generateState() {
return generator.call();
}
@Override
protected void onUnsubscribe(S state) {
onUnsubscribe.call(state);
}
@Override
protected S next(S state, Subscriber<? super T> subscriber) {
return next.call(state, subscriber);
}
}
/**
* Contains the producer loop that reacts to downstream requests of work.
*
* @param <T>
* the type of compatible Subscribers
*/
private static class SubscriptionProducer<S, T>
extends AtomicLong implements Producer, Subscription {
/** */
private static final long serialVersionUID = -3736864024352728072L;
private S state;
private final SubscriberFacade<? super T> facadeSubscriber;
private final Subscriber<? super T> actualSubscriber;
private final SyncOnSubscribe<S, T> parent;
volatile int unsubscribed;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<SubscriptionProducer> UNSUBSCRIBED =
AtomicIntegerFieldUpdater.newUpdater(SubscriptionProducer.class, "unsubscribed");
private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubscribe<S, T> parent, S state) {
this.actualSubscriber = subscriber;
this.parent = parent;
this.state = state;
this.facadeSubscriber = new SubscriberFacade<T>(subscriber);
}
@Override
public boolean isUnsubscribed() {
return unsubscribed != 0;
}
@Override
public void unsubscribe() {
if (UNSUBSCRIBED.compareAndSet(this, 0, 1) && get() == 0L) {
// we are processing the request loop and state may be concurrently modified
// execute onUnsub after evaluating next function
// PENDING_UNSUBSCRIBE.set(SubscriptionProducer.this, 1);
// else
// it's safe to process terminal behavior
parent.onUnsubscribe(state);
}
}
@Override
public void request(long n) {
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
if (n == Long.MAX_VALUE) {
fastpath();
} else {
slowPath(n);
}
}
}
void fastpath() {
final SyncOnSubscribe<S, T> p = parent;
final SubscriberFacade<? super T> fs = facadeSubscriber;
Subscriber<? super T> a = actualSubscriber;
S st = state;
if (a.isUnsubscribed()) {
p.onUnsubscribe(st);
return;
}
for (;;) {
try {
fs.onNextCalled = false;
st = p.next(st, fs);
} catch (Throwable ex) {
if (!fs.hasTerminated) {
fs.hasTerminated = true;
a.onError(ex);
p.onUnsubscribe(st);
}
return;
}
if (fs.hasTerminated || a.isUnsubscribed()) {
p.onUnsubscribe(st);
return;
}
}
}
void slowPath(long n) {
final SyncOnSubscribe<S, T> p = parent;
final SubscriberFacade<? super T> fs = facadeSubscriber;
Subscriber<? super T> a = actualSubscriber;
long r = n;
S st = this.state;
for (;;) {
if (a.isUnsubscribed()) {
p.onUnsubscribe(st);
return;
}
long numToEmit = r;
do {
try {
fs.onNextCalled = false;
st = p.next(st, fs);
} catch (Throwable ex) {
if (!fs.hasTerminated) {
fs.hasTerminated = true;
a.onError(ex);
p.onUnsubscribe(st);
}
return;
}
if (fs.hasTerminated || a.isUnsubscribed()) {
p.onUnsubscribe(st);
return;
}
numToEmit--;
} while (numToEmit != 0L);
this.state = st;
r = addAndGet(-r);
if (r == 0L) {
break;
}
}
}
static final class SubscriberFacade<T> extends Subscriber<T> {
private final Subscriber<? super T> subscriber;
boolean onNextCalled;
boolean hasTerminated;
private SubscriberFacade(Subscriber<? super T> subscriber) {
super(subscriber);
this.subscriber = subscriber;
}
@Override
public void onCompleted() {
if (hasTerminated) {
throw new IllegalStateException("Terminal event already emitted.");
}
hasTerminated = true;
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (hasTerminated) {
throw new IllegalStateException("Terminal event already emitted.");
}
hasTerminated = true;
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
@Override
public void onNext(T value) {
if (onNextCalled) {
throw new IllegalStateException("onNext called multiple times!");
}
onNextCalled = true;
subscriber.onNext(value);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment