Skip to content

Instantly share code, notes, and snippets.

@alifhasnain
Last active June 21, 2021 06:56
Show Gist options
  • Save alifhasnain/6a3a34aa659ca3fe213e17a562ec7895 to your computer and use it in GitHub Desktop.
Save alifhasnain/6a3a34aa659ca3fe213e17a562ec7895 to your computer and use it in GitHub Desktop.
RxJava important notes.

Comprehensive Guide by MindOrks Here

Various use cases for Observable

Observable's subscribe to an observer. Observer emits the state of the Observable.

For Observable creation. See this

Example:

Observable.just("Niloy").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                // do something when subscribed
            }

            @Override
            public void onNext(@NonNull String s) {
                // do something when observer emits a value
            }

            @Override
            public void onError(@NonNull Throwable e) {
                // do something when observer thrown an error
            }

            @Override
            public void onComplete() {
                // do something when task is completed
            }
        });

We could also write it lie this:

Observable.just("Niloy")
        .subscribe(str -> {
            // We found some result
        }, err -> {
            // Some error is thrown
        }, () -> {
            // Task is completed
        });

Or we could just get the result and leave other two blank:

Observable.just("Niloy")
                .subscribe(str -> {
                    // We found some result
                });

Specifying Schedulers for heavy lifting.

We can specify Schedulers where the Observer will subscribe and do the heavy works and also where to dispatch the result:

Observable.fromCallable(() -> {
            // Do some heavy task
            return "Did some heavy stuff :)";
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(myName -> {
            System.out.println(myName);
        });

Single

Single is mostly similar to Observable but the distinction is that unlike Observer it only emits a single value just once. So, it is wise to use Single where we know that we will only get a single value and do the job.

Single.fromCallable(() -> {
    // Do some heavy task
    return "Did some heavy task :D";
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(myName -> {
    Timber.e(myName);
});

Disposing an Observer

Disposing Observers is necessary on lifecycle events. If we don't dispose an Observer it could consume our memory, network and CPU when we don't need it to.

An example of disposing Observer in lifecycle event is given below:

// Inside some activity class
Disposable disposable;
private void doSomeTask() {
	disposable = Single.fromCallable(() -> {
            // Do some heavy task
            return "My name is Niloy.";
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(myName -> {
            Timber.e(myName);
        });
}

@Override
protected void onStop() {
	/*
	* stop tasks so we
	* don't have to waste our
	* resources
	* */
	disposable.dispose();
    super.onStop();
}

We can also use CompositeDisposable class to stop multiple Observable at the same time.

// we are inside some fragment
private CompositeDisposable compositeDisposable = new CompositeDisposable();

private void sameTask() {
	Observable observable1 = ....;
	Observable observable2 = ....;

	compositeDisposable.add(observable1.subscribe(...), cbservable2.subscribe(...));
}

@Override
protected void onDestroyView() {
    compositeDisposable.clear();
    super.onDestroyView();
}

Oparators

Operators are used for Transforming Observables. There are several operator available some of them are interval, map, flatMap etc. Some example are given below:

map
Observable.just(7)
        .map(Object::toString)
        .map(s -> s + " is a prime number.")
        .subscribe(str -> {
        	// prints 7 is a prime number
            Timber.e(str);
        });
filter
Integer[] intArr = new Integer[] { 1 , 2 , 5, -4, 8, -5 , 9 };
Observable.just(Arrays.asList(intArr))
        .flatMap(Observable::fromIterable)
        .filter(num -> num >= 0)
        .subscribe(num -> {
            Timber.e(String.valueOf(num));
        });
takeWhile
Integer[] intArr = new Integer[] { 1 , 2 , 5, -4, 8, -5 , 9 };
Observable.just(Arrays.asList(intArr))
        .flatMap(Observable::fromIterable)
        .takeWhile(num -> num >= 0)
        .subscribe(num -> {
        	/*
        	* stops emiting value when condition fails
        	* prints 1,2,5
        	* */
            Timber.e(String.valueOf(num));
        });
interval

Dispatches update after specific time interval

Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(value -> {
            Timber.e(value.toString());
        });
delay , repeat
Observable.just("Niloy")
        .delay(1, TimeUnit.SECONDS)
        .repeat() // we can specify how may times we want to repeat
        .subscribe(name -> {
            Timber.e(name);
        });
buffer , range
Observable.range(0, 10)
    .buffer(4)
    .subscribe((List<Integer> buffer) -> System.out.println(buffer));

// prints:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]
zip see example from here

Use cases

Chaining Observable/Single
Single.fromObservable(getFirstIntegerObservable())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Function<Integer, SingleSource<Integer>>() {
                    @Override
                    public SingleSource<Integer> apply(Integer value) throws Throwable {
                        // here `value` is the result from first call
                        return Single.fromObservable(getSecondIntegerObservable());
                    }
                })
                .subscribe(secondCall -> {
                    Timber.e("Value emitted: %s", String.valueOf(secondCall));
                });
Parallel network request
Observable.zip(
	repo.fetchPostWithId(postID).subscribeOn(Schedulers.io()),
	repo.fetchCommentWithPostID(postID).subscribeOn(Schedulers.io()),
	(post, comments) -> {
	    singlePostLiveData.postValue(post);
	    commentLiveData.postValue(comments);
	    return null;
	}
).subscribe(obj -> {}, Timber::e);
Make parallel request from a list source then convert the result to a list
Observable.create((ObservableOnSubscribe<List<String>>) emitter -> {
            List<String> nameList = new ArrayList<>();
            nameList.add("Niloy");
            nameList.add("Nipun");
            nameList.add("Ananna");
            nameList.add("Azmain");
            nameList.add("Atif");
            emitter.onNext(nameList);
            emitter.onComplete();
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMapIterable(new Function<List<String>, Iterable<String>>() {
                    /**
                    * This method converts list into a
                    * single item source
                    * */
                    @Override
                    public Iterable<String> apply(List<String> nameStrings) throws Throwable {
                        return nameStrings;
                    }
                })
                .flatMap(new Function<String, ObservableSource<String>>() {
                    /**
                     * flatMap resolves multiple Observables concurrently
                     * */
                    @Override
                    public ObservableSource<String> apply(String name) throws Throwable {
                        Random rand = new Random();
                        return Observable.just("My name is " + name).delay(rand.nextInt(6), TimeUnit.SECONDS);
                    }
                })
                /**
                 * When observable completes toList converts them to a Single<List<*>>
                 * */
                .toList()
                .subscribe(nameList -> {
                    for (String name : nameList) {
                        Timber.e(name);
                    }
                });
Perform multiple network request after specific time unit making the initial reqyest delay to 0.
Observable.interval(0 , 5, TimeUnit.SECONDS)
            .flatMap { Observable.fromSingle(getBaseUrl()) }
            .flatMap { Observable.fromSingle(getMatchData(matchId)) }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ timedData ->
                populateViewWithData(timedData.value())
                floatingViewBinding.progressBar.visibility = View.GONE
            }, { onError ->
                Timber.e(onError)
            })

Combine sequential and parallen network request.

Observable.fromSingle(getBaseUrl())
    .flatMap {
	/*
	* Emits value by making network request in parallel in every
	* 8 seconds
	* */
	Observable.interval(0, 8, TimeUnit.SECONDS)
	    .flatMap {
		Observable.zip(
		    Observable.fromSingle(getMatchInfo(matchId)),
		    Observable.fromSingle(getMatchLiveInfo(matchId)),
		    { matchInfo, matchLiveData -> matchInfo to matchLiveData }
		)
	    }
    }
    .doOnError {
	Timber.e(it)
    }
    .subscribe({ (matchInfo, matchLiveData) ->
	populateViewWithData(matchInfo, matchLiveData)
	floatingViewBinding.progressBar.visibility = View.GONE
    }, { onError ->
	Timber.e(onError)
    })
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment