Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Created May 20, 2014 03:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benjchristensen/f7969657650126b000be to your computer and use it in GitHub Desktop.
Save benjchristensen/f7969657650126b000be to your computer and use it in GitHub Desktop.
ConcatIfEmpty
import rx.Notification;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.functions.Func1;
public class ConcatIfEmpty {
public static void main(String[] args) {
Observable<String> empty = Observable.empty();
final Observable<String> o1 = Observable.from("hello", "world");
final Observable<String> o2 = Observable.from("HELLO", "WORLD");
concatIfEmptyViaMaterialize(o1, o2).subscribe(System.out::println);
System.out.println("--------------------");
concatIfEmptyViaMaterialize(empty, o1).subscribe(System.out::println);
System.out.println("--------------------");
concatIfEmptyViaMaterialize(empty, o2).subscribe(System.out::println);
System.out.println("--------------------");
concatIfEmptyViaOperator(o1, o2).subscribe(System.out::println);
System.out.println("--------------------");
concatIfEmptyViaOperator(empty, o1).subscribe(System.out::println);
System.out.println("--------------------");
concatIfEmptyViaOperator(empty, o2).subscribe(System.out::println);
}
private static Observable<String> concatIfEmptyViaOperator(final Observable<String> o1, final Observable<String> o2) {
return o1.lift(new Operator<String, String>() {
@Override
public Subscriber<? super String> call(Subscriber<? super String> child) {
return new Subscriber<String>(child) {
boolean receivedOnNext = false;
@Override
public void onCompleted() {
if (receivedOnNext) {
child.onCompleted();
} else {
// never received a value so let's use the second Observable
o2.unsafeSubscribe(child);
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(String t) {
receivedOnNext = true;
child.onNext(t);
}
};
}
});
}
private static Observable<String> concatIfEmptyViaMaterialize(final Observable<String> o1, final Observable<String> o2) {
return o1.materialize().flatMap(new Func1<Notification<String>, Observable<String>>() {
private boolean hasValues = false;
@Override
public Observable<String> call(Notification<String> n) {
if (n.isOnNext()) {
hasValues = true;
return Observable.just(n.getValue());
} else if (n.isOnCompleted()) {
if (hasValues) {
// complete
return Observable.empty();
} else {
// continue with the second Observable
return o2;
}
} else {
// emit the error and don't bother with the second Observable
return Observable.error(n.getThrowable());
}
}
});
}
}
@benjchristensen
Copy link
Author

This emits:

hello
world
--------------------
hello
world
--------------------
HELLO
WORLD
--------------------
hello
world
--------------------
hello
world
--------------------
HELLO
WORLD

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment