Skip to content

Instantly share code, notes, and snippets.

@keima
Last active February 27, 2017 08:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save keima/3e311b93c3d202af9661717bd3ced76b to your computer and use it in GitHub Desktop.
Save keima/3e311b93c3d202af9661717bd3ced76b to your computer and use it in GitHub Desktop.
TestSubscriber<List<String>> testSubscriber = new TestSubscriber<>();
long startTime = new Date().getTime(), endTime = 0;
System.out.println("Start.");
Observable.merge(
asyncObservable("uno", 4).map(new Func1<String, Pair<Integer, String>>() {
@Override
public Pair<Integer, String> call(String s) {
return Pair.create(0, s);
}
}),
asyncObservable("dos", 3).map(new Func1<String, Pair<Integer, String>>() {
@Override
public Pair<Integer, String> call(String s) {
return Pair.create(1, s);
}
}),
asyncObservable("tres", 2).map(new Func1<String, Pair<Integer, String>>() {
@Override
public Pair<Integer, String> call(String s) {
return Pair.create(2, s);
}
}),
asyncObservable("cuatro", 1).map(new Func1<String, Pair<Integer, String>>() {
@Override
public Pair<Integer, String> call(String s) {
return Pair.create(3, s);
}
})
).toList().flatMap(new Func1<List<Pair<Integer, String>>, Observable<List<String>>>() {
@Override
public Observable<List<String>> call(List<Pair<Integer, String>> pairs) {
Collections.sort(pairs, new Comparator<Pair<Integer, String>>() {
@Override
public int compare(Pair<Integer, String> lhs, Pair<Integer, String> rhs) {
if (lhs.first > rhs.first) {
return 1;
} else if (lhs.first < rhs.first) {
return -1;
}
return 0;
}
});
List<String> sorted = new ArrayList<>();
for (Pair<Integer, String> pair : pairs) {
sorted.add(pair.second);
}
return Observable.just(sorted);
}
}).doOnNext(new Action1<List<String>>() {
@Override
public void call(List<String> strings) {
System.out.println("onNext: " + strings);
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
System.out.println("onCompleted");
}
}).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
}).subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertCompleted();
testSubscriber.assertNoErrors();
endTime = new Date().getTime();
System.out.println("End: " + (endTime - startTime) + "ms");
// ---
private Observable<String> asyncObservable(String msg, int wait) {
return Observable.just(msg).delay(wait, TimeUnit.SECONDS, Schedulers.newThread());
}
Start.
onNext: [uno, dos, tres, cuatro]
onCompleted
End: 4131ms
Process finished with exit code 0
@keima
Copy link
Author

keima commented Feb 27, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment