Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
ChooseSubjectBasedOnFirstValue
package perf.backend;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class ChooseSubjectBasedOnFirstValue {
public static void main(String[] args) {
// Observable<Object> o = Observable.just(1);
Observable<Object> o = Observable.from("A", "B", "C");
o.lift(new Operator<Subject<Object, Object>, Object>() {
@Override
public Subscriber<? super Object> call(Subscriber<? super Subject<Object, Object>> child) {
return new Subscriber<Object>(child) {
private Subject<Object, Object> s;
@Override
public void onCompleted() {
s.onCompleted();
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onNext(Object t) {
if (s == null) {
if (t instanceof Number) {
s = AsyncSubject.create();
} else {
s = ReplaySubject.create();
}
child.onNext(s);
child.onCompleted();
}
s.onNext(t);
}
};
}
}).flatMap((Subject s) -> {
System.out.println("Subject: " + s);
s.map(i -> "1) => " + i).subscribe(System.out::println);
s.map(i -> "2) => " + i).subscribe(System.out::println);
return s;
}).subscribe();
}
}
@daschl
Copy link

daschl commented May 7, 2014

Wow thanks much for your efforts!

Will try it tomorrow morning and report success or failure :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment