Skip to content

Instantly share code, notes, and snippets.

@RubyLichtenstein
Last active July 30, 2020 00:18
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RubyLichtenstein/965a0e5eb5cb9c117e01719e4c47009c to your computer and use it in GitHub Desktop.
Save RubyLichtenstein/965a0e5eb5cb9c117e01719e4c47009c to your computer and use it in GitHub Desktop.
Migration To RxJava 2.x

Migration To RxJava 2.x

Introduction

In this post, I will show some points to be aware of in RxJava v1.x to v2.x migration.

The post divided to 3 sections

  • The Good

    • Naming
    • Coexistence
    • Interoperability
  • The Bad

    • Nulls In The Stream
  • The Ugly

    • Api Major Changes
    • Operator Differences
    • Catching Errors

The Good

Naming

For example functional-interfaces

  • Func1 Vs Function
rx.Observable.just("Hello")
            .map(new Func1<String, String>() {
                 @Override
                 public String call(String s) {
                    return s.toUpperCase();
            }
        });
        
io.reactivex.Observable.just("Hello")
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s.toUpperCase();
            }
        });

Same code with lambda dose not require any changes.

io.reactivex.Observable.just("Hello")
            .map(s -> s.toUpperCase());

rx.Observable.just("Hello")
            .map(s -> s.toUpperCase());

Coexistence

v1.x and v2.x can live together since the package name is different.

import io.reactivex.Observable;
import io.reactivex.functions.Function;
import rx.Observable;
import rx.functions.Func0;

Interoperability

Convert 1.x types to 2.x types and vice versa with RxJava2Interop

import hu.akarnokd.rxjava.interop.RxJavaInterop;

// convert from 1.x to 2.x

io.reactivex.Flowable    f2 = RxJavaInterop.toV2Flowable(rx.Observable);

io.reactivex.Observable  o2 = RxJavaInterop.toV2Observable(rx.Observable);

...

// convert from 2.x to 1.x

rx.Observable  o1 = RxJavaInterop.toV1Observable(Publisher);

rx.Observable  q1 = RxJavaInterop.toV1Observable(ObservableSource, BackpressureStrategy);

...

Update libraries depending on RxJava

The bad

RxJava 2.x no longer accepts null values and the following will yield NullPointerException immediately or as a signal to downstream

Find and replace all the nulls with Optional, kotlin Unit or other object. you can use compose filter and get optional to filter out absence from your stream

Observable.fromIterable(getOptionalsIterable())
	  .compose(filterAndGetOptional())
	  .subscribe();
	  
public static <T> ObservableTransformer<Optional<T>, T> filterAndGetOptional(){
    return upstream -> upstream.filter(Optional::isPresent).map(Optional::get);
}

The ugly

Api major changes

For example TestSubject

The 1.x TestSubject has been dropped. Its functionality can be achieved via TestScheduler, PublishProcessor/PublishSubject and observeOn(testScheduler)/scheduler parameter.

Catching Errors

  • Manually search nulls in the stream.
  • Log all Observer::onError() callbacks.
  • Use Tracure to find the source of the error, when the error comes from other thread.
  • Use RxJavaPlugins.setErrorHandler(e -> { }); for errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.

References

@Kishan-Cj
Copy link

Thanks a ton for this. Saves a lot of time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment