Created
January 10, 2015 22:20
-
-
Save tehmou/fd44b6f78acee427a076 to your computer and use it in GitHub Desktop.
RxJava .concat lost observables riddle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Let's say we have this code that takes a list, makes the items asynchronous observables (i. e. network requests) and then concatenates the results: | |
final List<String> list = Arrays.asList("1", "2", "3", "4", "5", "6"); | |
Log.d(TAG, "start: " + list); | |
Observable.concat( | |
Observable.from(list) | |
.map(s -> { | |
final Subject<String, String> subject = PublishSubject.create(); | |
final int delay = (list.size() - list.indexOf(s)) * 100; | |
Log.d(TAG, "delay: " + s + ", " + delay + "ms"); | |
new Handler().postDelayed(() -> { | |
Log.d(TAG, s); | |
subject.onNext(s); | |
subject.onCompleted(); | |
}, delay); | |
return subject; | |
})) | |
.toList() | |
.subscribe(strings -> Log.d(TAG, "result: " + strings)); | |
The results in the log might be surprising (with RxJava 1.0.4): | |
start: [1, 2, 3, 4, 5, 6] | |
delay: 1, 600ms | |
delay: 2, 500ms | |
2 | |
1 | |
delay: 3, 400ms | |
delay: 4, 300ms | |
4 | |
3 | |
delay: 5, 200ms | |
delay: 6, 100ms | |
6 | |
5 | |
result: [1, 3, 5] | |
The reason for this is the implementation of the Observable.concat and the use of the PublishSubject. The concat subscribes to the observable on the list it is given in a sequential order, and it can be that the onNext/onComplete of the containing observables have already passed - the PublishSubject does not return the last value it received for new subscriptions. One way to fix the issue here is to use an AsyncSubject or a ReplaySubject, which "cache" the last value for the concat to consume. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment