Last active
August 29, 2015 14:26
-
-
Save akarnokd/dffd7500981968a845d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package rx.observables; | |
import java.util.concurrent.atomic.*; | |
import rx.*; | |
import rx.Observable.OnSubscribe; | |
import rx.annotations.Experimental; | |
import rx.functions.*; | |
import rx.internal.operators.BackpressureUtils; | |
/** | |
* A utility class to create OnSubscribe functions which manages the back pressure concerns that | |
* otherwise aren't handled by {@link rx.Observable#create(OnSubscribe) | |
* Observable.create(OnSubscribe)}. Note that the behavior in the {@code next} argument must | |
* serialize all calls to the downstream subscriber. | |
* | |
* @param <S> | |
* the type of the user-define state used in {@link #generateState() generateState(S)} , | |
* {@link #next(Object, Subscriber) next(S, Subscriber)}, and | |
* {@link #onUnsubscribe(Object) onUnsubscribe(S)}. | |
* @param <T> | |
* the type {@code Subscribers} that will be compatible with {@code this}. | |
*/ | |
@Experimental | |
public abstract class SyncOnSubscribe<S, T> implements OnSubscribe<T> { | |
/** | |
* Produces an initial state value. The default implementation returns {@code null}. | |
* | |
* @return the initial state value that will be received by the {@link #next(Object, Subscriber) | |
* next(S, Subscriber)} function as it's first argument on it's first execution. | |
*/ | |
protected S generateState() { | |
return null; | |
} | |
/** | |
* SyncOnSubscribe iterates over this function to produce values in a way compatible with | |
* back-pressure. Implementations may not call {@link rx.Subscriber#onNext(Object) | |
* subscriber.onNext(T)} concurrently. You may serialize onNext calls over more than one thread, | |
* but concurrent calls to {@code onNext(t)} will result in a runtime exception. Calling | |
* {@link rx.Subscriber#onCompleted() subscriber.onCompleted()} or | |
* {@link rx.Subscriber#onError(Throwable) subscriber.onError(Throwable)} will result in the | |
* downstream subscriber unsubscribing and executing | |
* {@link SyncOnSubscribe#onUnsubscribe(Object) onUnsubscribe(S)}. | |
* | |
* @param state | |
* the value produced by {@link #generateState()} on the first execution or the value | |
* returned from the previous execution of {@code next(Object, Subscriber)} | |
* @param subscriber | |
* down stream data subscriber to receive onNext, onCompleted, or onError | |
* @return the state value to be received by the next execution of this function | |
*/ | |
protected abstract S next(S state, Subscriber<? super T> subscriber); | |
/** | |
* Executed after the downstream subscriber is unsubscribed. This method is useful to override | |
* to clean up the SyncOnSubscribe state. Note that this will not be executed concurrently with | |
* {@link #next(Object, Subscriber) next}. | |
* | |
* @param state | |
* the state value | |
*/ | |
protected void onUnsubscribe(S state) { | |
} | |
/** | |
* Generates a synchronous onSubscribe function that manages back pressure requests delegating | |
* to your {@code next} function/closure to produce data when values are requested by the | |
* downstream subscriber. | |
* | |
* This overload creates a SyncOnSubscribe without an explicit clean up step. | |
* | |
* @param generator | |
* generates the initial state value (see {@link #generateState()}) | |
* @param next | |
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber) | |
* next(S, Subscriber)}) | |
* @return an OnSubscribe that emits data downstream in a protocol compatible with | |
* back-pressure. | |
*/ | |
@Experimental | |
public static <S, T> SyncOnSubscribe<S, T> createWithState(Func0<S> generator, final Action2<S, Subscriber<? super T>> next) { | |
Func2<S, Subscriber<? super T>, S> nextFunc = new Func2<S, Subscriber<? super T>, S>() { | |
@Override | |
public S call(S state, Subscriber<? super T> subscriber) { | |
next.call(state, subscriber); | |
return state; | |
} | |
}; | |
Action1<S> onUnsubscribe = new Action1<S>() { | |
@Override | |
public void call(S t) { | |
} | |
}; | |
return new LambdaOnSubscribe<S, T>(generator, nextFunc, onUnsubscribe); | |
} | |
/** | |
* Generates a synchronous onSubscribe function that manages back pressure requests delegating | |
* to your {@code next} function/closure to produce data when values are requested by the | |
* downstream subscriber. | |
* | |
* This overload creates a SyncOnSubscribe without an explicit clean up step. | |
* | |
* @param generator | |
* generates the initial state value (see {@link #generateState()}) | |
* @param next | |
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber) | |
* next(S, Subscriber)}) | |
* @param onUnsubscribe | |
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) | |
* @return an OnSubscribe that emits data downstream in a protocol compatible with | |
* back-pressure. | |
*/ | |
@Experimental | |
public static <S, T> SyncOnSubscribe<S, T> createWithState(Func0<S> generator, final Action2<S, Subscriber<? super T>> next, final Action1<S> onUnsubscribe) { | |
Func2<S, Subscriber<? super T>, S> nextFunc = new Func2<S, Subscriber<? super T>, S>() { | |
@Override | |
public S call(S state, Subscriber<? super T> subscriber) { | |
next.call(state, subscriber); | |
return state; | |
} | |
}; | |
return new LambdaOnSubscribe<S, T>(generator, nextFunc, onUnsubscribe); | |
} | |
/** | |
* Generates a synchronous onSubscribe function that manages back pressure requests delegating | |
* to your {@code next} function/closure to produce data when values are requested by the | |
* downstream subscriber. | |
* | |
* @param generator | |
* generates the initial state value (see {@link #generateState()}) | |
* @param next | |
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber) | |
* next(S, Subscriber)}) | |
* @return an OnSubscribe that emits data downstream in a protocol compatible with | |
* back-pressure. | |
*/ | |
@Experimental | |
public static <S, T> SyncOnSubscribe<S, T> createWithMutableState(Func0<S> generator, final Func2<S, Subscriber<? super T>, S> next) { | |
Func2<S, Subscriber<? super T>, S> nextFunc = new Func2<S, Subscriber<? super T>, S>() { | |
@Override | |
public S call(S state, Subscriber<? super T> subscriber) { | |
next.call(state, subscriber); | |
return state; | |
} | |
}; | |
Action1<S> onUnsubscribe = new Action1<S>() { | |
@Override | |
public void call(S t) { | |
} | |
}; | |
return new LambdaOnSubscribe<S, T>(generator, nextFunc, onUnsubscribe); | |
} | |
/** | |
* Generates a synchronous onSubscribe function that manages back pressure requests delegating | |
* to your {@code next} function/closure to produce data when values are requested by the | |
* downstream subscriber. | |
* | |
* @param generator | |
* generates the initial state value (see {@link #generateState()}) | |
* @param next | |
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber) | |
* next(S, Subscriber)}) | |
* @param onUnsubscribe | |
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) | |
* @return an OnSubscribe that emits data downstream in a protocol compatible with | |
* back-pressure. | |
*/ | |
@Experimental | |
public static <S, T> SyncOnSubscribe<S, T> createWithMutableState(Func0<S> generator, Func2<? super S, ? super Subscriber<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) { | |
return new LambdaOnSubscribe<S, T>(generator, next, onUnsubscribe); | |
} | |
/** | |
* Generates a synchronous onSubscribe function that manages back pressure requests delegating | |
* to your {@code next} function/closure to produce data when values are requested by the | |
* downstream subscriber. | |
* | |
* This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state | |
* value. | |
* | |
* @param next | |
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber) | |
* next(S, Subscriber)}) | |
* @return an OnSubscribe that emits data downstream in a protocol compatible with | |
* back-pressure. | |
*/ | |
@Experimental | |
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<Subscriber<? super T>> next) { | |
Func2<Void, Subscriber<? super T>, Void> nextFunc = new Func2<Void, Subscriber<? super T>, Void>() { | |
@Override | |
public Void call(Void state, Subscriber<? super T> subscriber) { | |
next.call(subscriber); | |
return state; | |
} | |
}; | |
return new LambdaOnSubscribe<Void, T>(VOID_GENERATOR, nextFunc, VOID_ON_UNSUBSCRIBE); | |
} | |
/** | |
* Generates a synchronous onSubscribe function that manages back pressure requests delegating | |
* to your {@code next} function/closure to produce data when values are requested by the | |
* downstream subscriber. | |
* | |
* This overload creates a SyncOnSubscribe without an explicit state. | |
* | |
* @param next | |
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber) | |
* next(S, Subscriber)}) | |
* @return an OnSubscribe that emits data downstream in a protocol compatible with | |
* back-pressure. | |
*/ | |
@Experimental | |
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<Subscriber<? super T>> next, final Action0 onUnsubscribe) { | |
Func0<Void> generator = new Func0<Void>() { | |
@Override | |
public Void call() { | |
return null; | |
} | |
}; | |
Func2<Void, Subscriber<? super T>, Void> nextFunc = new Func2<Void, Subscriber<? super T>, Void>() { | |
@Override | |
public Void call(Void state, Subscriber<? super T> subscriber) { | |
next.call(subscriber); | |
return null; | |
} | |
}; | |
Action1<Void> wrappedOnUnsubscribe = new Action1<Void>() { | |
@Override | |
public void call(Void t) { | |
onUnsubscribe.call(); | |
} | |
}; | |
return new LambdaOnSubscribe<Void, T>(generator, nextFunc, wrappedOnUnsubscribe); | |
} | |
private static final Action1<Void> VOID_ON_UNSUBSCRIBE = new Action1<Void>() { | |
@Override | |
public void call(Void t) { | |
} | |
}; | |
private static final Func0<Void> VOID_GENERATOR = new Func0<Void>() { | |
@Override | |
public Void call() { | |
return null; | |
} | |
}; | |
@Override | |
public final void call(final Subscriber<? super T> subscriber) { | |
SubscriptionProducer<S, T> sp = new SubscriptionProducer<S, T>(subscriber, this, generateState()); | |
subscriber.add(sp); | |
subscriber.setProducer(sp); | |
} | |
/** | |
* An implementation of SyncOnSubscribe that delegates | |
* {@link SyncOnSubscribe#next(Object, Subscriber)}, {@link SyncOnSubscribe#generateState()}, | |
* and {@link SyncOnSubscribe#onUnsubscribe(Object)} to provided functions/closures. | |
* | |
* @param <T> | |
* the type of compatible Subscribers | |
* @param <S> | |
* the type of the user-defined state | |
*/ | |
private static final class LambdaOnSubscribe<S, T> extends SyncOnSubscribe<S, T> { | |
private final Func0<S> generator; | |
private final Func2<? super S, ? super Subscriber<? super T>, ? extends S> next; | |
private final Action1<? super S> onUnsubscribe; | |
private LambdaOnSubscribe(Func0<S> generator, Func2<? super S, ? super Subscriber<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) { | |
this.generator = generator; | |
this.next = next; | |
this.onUnsubscribe = onUnsubscribe; | |
} | |
@Override | |
protected S generateState() { | |
return generator.call(); | |
} | |
@Override | |
protected void onUnsubscribe(S state) { | |
onUnsubscribe.call(state); | |
} | |
@Override | |
protected S next(S state, Subscriber<? super T> subscriber) { | |
return next.call(state, subscriber); | |
} | |
} | |
/** | |
* Contains the producer loop that reacts to downstream requests of work. | |
* | |
* @param <T> | |
* the type of compatible Subscribers | |
*/ | |
private static class SubscriptionProducer<S, T> | |
extends AtomicLong implements Producer, Subscription { | |
/** */ | |
private static final long serialVersionUID = -3736864024352728072L; | |
private S state; | |
private final SubscriberFacade<? super T> facadeSubscriber; | |
private final Subscriber<? super T> actualSubscriber; | |
private final SyncOnSubscribe<S, T> parent; | |
volatile int unsubscribed; | |
@SuppressWarnings("rawtypes") | |
static final AtomicIntegerFieldUpdater<SubscriptionProducer> UNSUBSCRIBED = | |
AtomicIntegerFieldUpdater.newUpdater(SubscriptionProducer.class, "unsubscribed"); | |
private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubscribe<S, T> parent, S state) { | |
this.actualSubscriber = subscriber; | |
this.parent = parent; | |
this.state = state; | |
this.facadeSubscriber = new SubscriberFacade<T>(subscriber); | |
} | |
@Override | |
public boolean isUnsubscribed() { | |
return unsubscribed != 0; | |
} | |
@Override | |
public void unsubscribe() { | |
if (UNSUBSCRIBED.compareAndSet(this, 0, 1) && get() == 0L) { | |
// we are processing the request loop and state may be concurrently modified | |
// execute onUnsub after evaluating next function | |
// PENDING_UNSUBSCRIBE.set(SubscriptionProducer.this, 1); | |
// else | |
// it's safe to process terminal behavior | |
parent.onUnsubscribe(state); | |
} | |
} | |
@Override | |
public void request(long n) { | |
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) { | |
if (n == Long.MAX_VALUE) { | |
fastpath(); | |
} else { | |
slowPath(n); | |
} | |
} | |
} | |
void fastpath() { | |
final SyncOnSubscribe<S, T> p = parent; | |
final SubscriberFacade<? super T> fs = facadeSubscriber; | |
Subscriber<? super T> a = actualSubscriber; | |
S st = state; | |
if (a.isUnsubscribed()) { | |
p.onUnsubscribe(st); | |
return; | |
} | |
for (;;) { | |
try { | |
fs.onNextCalled = false; | |
st = p.next(st, fs); | |
} catch (Throwable ex) { | |
if (!fs.hasTerminated) { | |
fs.hasTerminated = true; | |
a.onError(ex); | |
p.onUnsubscribe(st); | |
} | |
return; | |
} | |
if (fs.hasTerminated || a.isUnsubscribed()) { | |
p.onUnsubscribe(st); | |
return; | |
} | |
} | |
} | |
void slowPath(long n) { | |
final SyncOnSubscribe<S, T> p = parent; | |
final SubscriberFacade<? super T> fs = facadeSubscriber; | |
Subscriber<? super T> a = actualSubscriber; | |
long r = n; | |
S st = this.state; | |
for (;;) { | |
if (a.isUnsubscribed()) { | |
p.onUnsubscribe(st); | |
return; | |
} | |
long numToEmit = r; | |
do { | |
try { | |
fs.onNextCalled = false; | |
st = p.next(st, fs); | |
} catch (Throwable ex) { | |
if (!fs.hasTerminated) { | |
fs.hasTerminated = true; | |
a.onError(ex); | |
p.onUnsubscribe(st); | |
} | |
return; | |
} | |
if (fs.hasTerminated || a.isUnsubscribed()) { | |
p.onUnsubscribe(st); | |
return; | |
} | |
numToEmit--; | |
} while (numToEmit != 0L); | |
this.state = st; | |
r = addAndGet(-r); | |
if (r == 0L) { | |
break; | |
} | |
} | |
} | |
static final class SubscriberFacade<T> extends Subscriber<T> { | |
private final Subscriber<? super T> subscriber; | |
boolean onNextCalled; | |
boolean hasTerminated; | |
private SubscriberFacade(Subscriber<? super T> subscriber) { | |
super(subscriber); | |
this.subscriber = subscriber; | |
} | |
@Override | |
public void onCompleted() { | |
if (hasTerminated) { | |
throw new IllegalStateException("Terminal event already emitted."); | |
} | |
hasTerminated = true; | |
if (!subscriber.isUnsubscribed()) { | |
subscriber.onCompleted(); | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
if (hasTerminated) { | |
throw new IllegalStateException("Terminal event already emitted."); | |
} | |
hasTerminated = true; | |
if (!subscriber.isUnsubscribed()) { | |
subscriber.onError(e); | |
} | |
} | |
@Override | |
public void onNext(T value) { | |
if (onNextCalled) { | |
throw new IllegalStateException("onNext called multiple times!"); | |
} | |
onNextCalled = true; | |
subscriber.onNext(value); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment