In this post, I will show some points to be aware of in RxJava v1.x to v2.x migration.
The post divided to 3 sections
-
The Good
- Naming
- Coexistence
- Interoperability
-
The Bad
- Nulls In The Stream
-
The Ugly
- Api Major Changes
- Operator Differences
- Catching Errors
For example functional-interfaces
- Func1 Vs Function
rx.Observable.just("Hello")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
});
io.reactivex.Observable.just("Hello")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s.toUpperCase();
}
});
Same code with lambda dose not require any changes.
io.reactivex.Observable.just("Hello")
.map(s -> s.toUpperCase());
rx.Observable.just("Hello")
.map(s -> s.toUpperCase());
v1.x and v2.x can live together since the package name is different.
import io.reactivex.Observable;
import io.reactivex.functions.Function;
import rx.Observable;
import rx.functions.Func0;
Convert 1.x types to 2.x types and vice versa with RxJava2Interop
import hu.akarnokd.rxjava.interop.RxJavaInterop;
// convert from 1.x to 2.x
io.reactivex.Flowable f2 = RxJavaInterop.toV2Flowable(rx.Observable);
io.reactivex.Observable o2 = RxJavaInterop.toV2Observable(rx.Observable);
...
// convert from 2.x to 1.x
rx.Observable o1 = RxJavaInterop.toV1Observable(Publisher);
rx.Observable q1 = RxJavaInterop.toV1Observable(ObservableSource, BackpressureStrategy);
...
RxJava 2.x no longer accepts null values and the following will yield NullPointerException immediately or as a signal to downstream
Find and replace all the nulls with Optional, kotlin Unit or other object. you can use compose filter and get optional to filter out absence from your stream
Observable.fromIterable(getOptionalsIterable())
.compose(filterAndGetOptional())
.subscribe();
public static <T> ObservableTransformer<Optional<T>, T> filterAndGetOptional(){
return upstream -> upstream.filter(Optional::isPresent).map(Optional::get);
}
For example TestSubject
The 1.x TestSubject has been dropped. Its functionality can be achieved via TestScheduler, PublishProcessor/PublishSubject and observeOn(testScheduler)/scheduler parameter.
- Manually search nulls in the stream.
- Log all Observer::onError() callbacks.
- Use Tracure to find the source of the error, when the error comes from other thread.
- Use RxJavaPlugins.setErrorHandler(e -> { }); for errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.
- Official Docs.
- Traceur - Easier RxJava2 debugging with better stacktraces.
- RxJava2Interop - Convert between RxJava 1.x and 2.x reactive types.
- Migrating to RxJava 2 (blog posts) http://engineering.rallyhealth.com/mobile/rxjava/reactive/2017/03/15/migrating-to-rxjava-2.html https://freeletics.engineering/2017/11/06/migration-to-rxjava-2.x.html
Thanks a ton for this. Saves a lot of time