Skip to content

Instantly share code, notes, and snippets.

@tehmou
Created January 10, 2015 22:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tehmou/fd44b6f78acee427a076 to your computer and use it in GitHub Desktop.
Save tehmou/fd44b6f78acee427a076 to your computer and use it in GitHub Desktop.
RxJava .concat lost observables riddle
Let's say we have this code that takes a list, makes the items asynchronous observables (i. e. network requests) and then concatenates the results:
final List<String> list = Arrays.asList("1", "2", "3", "4", "5", "6");
Log.d(TAG, "start: " + list);
Observable.concat(
Observable.from(list)
.map(s -> {
final Subject<String, String> subject = PublishSubject.create();
final int delay = (list.size() - list.indexOf(s)) * 100;
Log.d(TAG, "delay: " + s + ", " + delay + "ms");
new Handler().postDelayed(() -> {
Log.d(TAG, s);
subject.onNext(s);
subject.onCompleted();
}, delay);
return subject;
}))
.toList()
.subscribe(strings -> Log.d(TAG, "result: " + strings));
The results in the log might be surprising (with RxJava 1.0.4):
start: [1, 2, 3, 4, 5, 6]
delay: 1, 600ms
delay: 2, 500ms
2
1
delay: 3, 400ms
delay: 4, 300ms
4
3
delay: 5, 200ms
delay: 6, 100ms
6
5
result: [1, 3, 5]
The reason for this is the implementation of the Observable.concat and the use of the PublishSubject. The concat subscribes to the observable on the list it is given in a sequential order, and it can be that the onNext/onComplete of the containing observables have already passed - the PublishSubject does not return the last value it received for new subscriptions. One way to fix the issue here is to use an AsyncSubject or a ReplaySubject, which "cache" the last value for the concat to consume.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment