Skip to content

Instantly share code, notes, and snippets.

@thomasnield
Created August 31, 2015 13:56
Show Gist options
  • Save thomasnield/c5e640bd917ebd86678c to your computer and use it in GitHub Desktop.
Save thomasnield/c5e640bd917ebd86678c to your computer and use it in GitHub Desktop.
Consolidated Infinite Observable<List<T>> Test
import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.subjects.BehaviorSubject;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public final class NestedInfiniteTest {
private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);
public static void main(String[] args) {
Observable<List<Parent>> infiniteParentListStream = parentSubject
.map(i -> IntStream.range(0,i).mapToObj(val -> new Parent()).collect(Collectors.toList()))
.cache(1);
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.flatMap(parentList ->
Observable.<Observable<List<Child>>>create(s -> {
parentList.stream().map(Parent::getInfiniteChildListStream).forEach(s::onNext);
s.onCompleted();
})
.toList() //List<<Observable<List<Child>>>>
.flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, new FuncN<List<Child>>() {
@Override
public List<Child> call(Object... args) {
ArrayList<Child> list = new ArrayList<>();
for (Object obj : args) {
list.addAll((List<Child>) obj);
}
return list;
}
}))
);
allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
childSubject.onNext(10);
parentSubject.onNext(5);
childSubject.onNext(2);
}
private static final class Parent {
private final Observable<List<Child>> infiniteChildListStream = childSubject
.map(i -> IntStream.range(0, i).mapToObj(val -> new Child()).collect(Collectors.toList())).cache(1);
public Observable<List<Child>> getInfiniteChildListStream() {
return infiniteChildListStream;
}
}
private static final class Child {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment