Skip to content

Instantly share code, notes, and snippets.

@MartinHarkins
Last active January 11, 2016 16:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MartinHarkins/ece31c41e436fa4cdc44 to your computer and use it in GitHub Desktop.
Save MartinHarkins/ece31c41e436fa4cdc44 to your computer and use it in GitHub Desktop.
Late Subscribing and Polling with ReactiveX (http://blog.grio.com/2016/01/rxandroid-polling-and-other.html)
ConnectableObservable<Wrapper<List<Data>>> dataObservable;
public Observable<Wrapper<List<Data>>> getData() {
if (dataObservable == null) {
dataObservable = webService.getApiData()
.compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling)
.map(dataList -> Wrapper.response(dataList))
// Retry a few times before letting an error through.
.retry(5)
.onErrorResumeNext(throwable -> {
// If the server is unreachable, let the subscriber know.
// The user may be able to do something about his connection.
if (throwable instanceof UnknownHostException || throwable instanceof ...) {
// Recoverable error - transmit error without breaking.
return Observable.just(Wrapper.error(throwable));
}
// In case of an unrecoverable error, let it end.
return Observable.error(throwable);
})
.repeatWhen(observable ->
Observable.interval(5, TimeUnit.MINUTES)
.mergeWith(hook)
)
.replay(1);
dataObservable.connect();
}
return dataObservable;
}
public void triggerDataFetch() {
hook.onNext(-1L);
}
Observable.Transformer networkScheduling = observable ->
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// Consumer
dataService.getData().
subscribe(wrapper -> {
if (wrapper.hasError()) {
// handle error
} else {
List<Data> list = wrapper.response;
// do something.
}
}, e -> logger.error("Unrecoverable error getting data.", e));
public interface Webservice {
@GET("api/data")
public Observable<List<Data>> getApiData();
}
public static class Wrapper<ResponseType> {
public Throwable error;
public ResponseType response;
private Wrapper() { }
public static <T> Wrapper<T> error(Throwable t) {
Wrapper<T> wrapper = new Wrapper<T>();
wrapper.error = t;
return wrapper;
}
public static <T> Wrapper<T> response(T response) {
Wrapper<T> wrapper = new Wrapper<T>();
wrapper.response = response;
return wrapper;
}
public boolean hasError() { return error != null; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment