Created
June 4, 2016 12:38
-
-
Save JohnWowUs/41ffa8c74d520c662766acd0c747b677 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.*; | |
import rx.functions.Func1; | |
import rx.internal.producers.ProducerArbiter; | |
import rx.subscriptions.SerialSubscription; | |
public final class OperatorSwitchIfMatch<T> implements Observable.Operator<T, T> { | |
private final Observable<? extends T> alternate; | |
private final Func1<T, Boolean> selector ; | |
public OperatorSwitchIfMatch(Observable<? extends T> alternate, Func1<T, Boolean> selector) { | |
this.alternate = alternate; | |
this.selector = selector; | |
} | |
@Override | |
public Subscriber<? super T> call(Subscriber<? super T> child) { | |
final SerialSubscription ssub = new SerialSubscription(); | |
ProducerArbiter arbiter = new ProducerArbiter(); | |
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, ssub, arbiter, alternate,selector); | |
ssub.set(parent); | |
child.add(ssub); | |
child.setProducer(arbiter); | |
return parent; | |
} | |
private static final class ParentSubscriber<T> extends Subscriber<T> { | |
private boolean matched = true; | |
private final Subscriber<? super T> child; | |
private final SerialSubscription ssub; | |
private final ProducerArbiter arbiter; | |
private final Observable<? extends T> alternate; | |
private final Func1<T, Boolean> selector ; | |
ParentSubscriber(Subscriber<? super T> child, final SerialSubscription ssub, ProducerArbiter arbiter, Observable<? extends T> alternate, Func1<T, Boolean> selector) { | |
this.child = child; | |
this.ssub = ssub; | |
this.arbiter = arbiter; | |
this.alternate = alternate; | |
this.selector = selector; | |
} | |
@Override | |
public void setProducer(final Producer producer) { | |
arbiter.setProducer(producer); | |
} | |
@Override | |
public void onCompleted() { | |
child.onCompleted(); | |
} | |
private void subscribeToAlternate() { | |
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter); | |
ssub.set(as); | |
alternate.unsafeSubscribe(as); | |
} | |
@Override | |
public void onError(Throwable e) { | |
child.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
if (selector.call(t)) { | |
if (!child.isUnsubscribed()) { | |
subscribeToAlternate(); | |
} | |
} else { | |
child.onNext(t); | |
} | |
arbiter.produced(1); | |
} | |
} | |
private static final class AlternateSubscriber<T> extends Subscriber<T> { | |
private final ProducerArbiter arbiter; | |
private final Subscriber<? super T> child; | |
AlternateSubscriber(Subscriber<? super T> child, ProducerArbiter arbiter) { | |
this.child = child; | |
this.arbiter = arbiter; | |
} | |
@Override | |
public void setProducer(final Producer producer) { | |
arbiter.setProducer(producer); | |
} | |
@Override | |
public void onCompleted() { | |
child.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
child.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
child.onNext(t); | |
arbiter.produced(1); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment