Comprehensive Guide by MindOrks Here
Observable
's subscribe to an observer. Observer
emits the state of the Observable
.
For Observable
creation. See this
Example:
Observable.just("Niloy").subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// do something when subscribed
}
@Override
public void onNext(@NonNull String s) {
// do something when observer emits a value
}
@Override
public void onError(@NonNull Throwable e) {
// do something when observer thrown an error
}
@Override
public void onComplete() {
// do something when task is completed
}
});
We could also write it lie this:
Observable.just("Niloy")
.subscribe(str -> {
// We found some result
}, err -> {
// Some error is thrown
}, () -> {
// Task is completed
});
Or we could just get the result and leave other two blank:
Observable.just("Niloy")
.subscribe(str -> {
// We found some result
});
We can specify Schedulers
where the Observer
will subscribe and do the heavy works and also where to dispatch the result:
Observable.fromCallable(() -> {
// Do some heavy task
return "Did some heavy stuff :)";
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(myName -> {
System.out.println(myName);
});
Single
is mostly similar to Observable
but the distinction is that unlike Observer
it only emits a single value just once. So, it is wise to use Single
where we know that we will only get a single value and do the job.
Single.fromCallable(() -> {
// Do some heavy task
return "Did some heavy task :D";
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(myName -> {
Timber.e(myName);
});
Disposing Observer
s is necessary on lifecycle
events. If we don't dispose an Observer
it could consume our memory, network and CPU when we don't need it to.
An example of disposing Observer
in lifecycle event is given below:
// Inside some activity class
Disposable disposable;
private void doSomeTask() {
disposable = Single.fromCallable(() -> {
// Do some heavy task
return "My name is Niloy.";
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(myName -> {
Timber.e(myName);
});
}
@Override
protected void onStop() {
/*
* stop tasks so we
* don't have to waste our
* resources
* */
disposable.dispose();
super.onStop();
}
We can also use CompositeDisposable
class to stop multiple Observable
at the same time.
// we are inside some fragment
private CompositeDisposable compositeDisposable = new CompositeDisposable();
private void sameTask() {
Observable observable1 = ....;
Observable observable2 = ....;
compositeDisposable.add(observable1.subscribe(...), cbservable2.subscribe(...));
}
@Override
protected void onDestroyView() {
compositeDisposable.clear();
super.onDestroyView();
}
Operators are used for Transforming Observables. There are several operator available some of them are interval
, map
, flatMap
etc. Some example are given below:
Observable.just(7)
.map(Object::toString)
.map(s -> s + " is a prime number.")
.subscribe(str -> {
// prints 7 is a prime number
Timber.e(str);
});
Integer[] intArr = new Integer[] { 1 , 2 , 5, -4, 8, -5 , 9 };
Observable.just(Arrays.asList(intArr))
.flatMap(Observable::fromIterable)
.filter(num -> num >= 0)
.subscribe(num -> {
Timber.e(String.valueOf(num));
});
Integer[] intArr = new Integer[] { 1 , 2 , 5, -4, 8, -5 , 9 };
Observable.just(Arrays.asList(intArr))
.flatMap(Observable::fromIterable)
.takeWhile(num -> num >= 0)
.subscribe(num -> {
/*
* stops emiting value when condition fails
* prints 1,2,5
* */
Timber.e(String.valueOf(num));
});
Dispatches update after specific time interval
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(value -> {
Timber.e(value.toString());
});
Observable.just("Niloy")
.delay(1, TimeUnit.SECONDS)
.repeat() // we can specify how may times we want to repeat
.subscribe(name -> {
Timber.e(name);
});
Observable.range(0, 10)
.buffer(4)
.subscribe((List<Integer> buffer) -> System.out.println(buffer));
// prints:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]
zip
see example from here
Single.fromObservable(getFirstIntegerObservable())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer value) throws Throwable {
// here `value` is the result from first call
return Single.fromObservable(getSecondIntegerObservable());
}
})
.subscribe(secondCall -> {
Timber.e("Value emitted: %s", String.valueOf(secondCall));
});
Observable.zip(
repo.fetchPostWithId(postID).subscribeOn(Schedulers.io()),
repo.fetchCommentWithPostID(postID).subscribeOn(Schedulers.io()),
(post, comments) -> {
singlePostLiveData.postValue(post);
commentLiveData.postValue(comments);
return null;
}
).subscribe(obj -> {}, Timber::e);
Observable.create((ObservableOnSubscribe<List<String>>) emitter -> {
List<String> nameList = new ArrayList<>();
nameList.add("Niloy");
nameList.add("Nipun");
nameList.add("Ananna");
nameList.add("Azmain");
nameList.add("Atif");
emitter.onNext(nameList);
emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMapIterable(new Function<List<String>, Iterable<String>>() {
/**
* This method converts list into a
* single item source
* */
@Override
public Iterable<String> apply(List<String> nameStrings) throws Throwable {
return nameStrings;
}
})
.flatMap(new Function<String, ObservableSource<String>>() {
/**
* flatMap resolves multiple Observables concurrently
* */
@Override
public ObservableSource<String> apply(String name) throws Throwable {
Random rand = new Random();
return Observable.just("My name is " + name).delay(rand.nextInt(6), TimeUnit.SECONDS);
}
})
/**
* When observable completes toList converts them to a Single<List<*>>
* */
.toList()
.subscribe(nameList -> {
for (String name : nameList) {
Timber.e(name);
}
});
Observable.interval(0 , 5, TimeUnit.SECONDS)
.flatMap { Observable.fromSingle(getBaseUrl()) }
.flatMap { Observable.fromSingle(getMatchData(matchId)) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ timedData ->
populateViewWithData(timedData.value())
floatingViewBinding.progressBar.visibility = View.GONE
}, { onError ->
Timber.e(onError)
})
Observable.fromSingle(getBaseUrl())
.flatMap {
/*
* Emits value by making network request in parallel in every
* 8 seconds
* */
Observable.interval(0, 8, TimeUnit.SECONDS)
.flatMap {
Observable.zip(
Observable.fromSingle(getMatchInfo(matchId)),
Observable.fromSingle(getMatchLiveInfo(matchId)),
{ matchInfo, matchLiveData -> matchInfo to matchLiveData }
)
}
}
.doOnError {
Timber.e(it)
}
.subscribe({ (matchInfo, matchLiveData) ->
populateViewWithData(matchInfo, matchLiveData)
floatingViewBinding.progressBar.visibility = View.GONE
}, { onError ->
Timber.e(onError)
})