Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JohnWowUs/d7fc45689ed30333ecd3ee82cc4b6a35 to your computer and use it in GitHub Desktop.
Save JohnWowUs/d7fc45689ed30333ecd3ee82cc4b6a35 to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.observers.SerializedSubscriber;
public final class OperatorTakeUntilComplete<T, E> implements Operator<T, T> {
private final Observable<? extends E> other;
public OperatorTakeUntilComplete(final Observable<? extends E> other) {
this.other = other;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
final Subscriber<T> main = new Subscriber<T>(serial, false) {
@Override
public void onNext(T t) {
serial.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
serial.onError(e);
} finally {
serial.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
serial.onCompleted();
} finally {
serial.unsubscribe();
}
}
};
final Subscriber<E> so = new Subscriber<E>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Override
public void onCompleted() {
main.onCompleted();
}
@Override
public void onError(Throwable e) {
main.onError(e);
}
@Override
public void onNext(E t) {
}
};
serial.add(main);
serial.add(so);
child.add(serial);
other.unsafeSubscribe(so);
return main;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment