Last active
January 11, 2016 16:52
-
-
Save MartinHarkins/ece31c41e436fa4cdc44 to your computer and use it in GitHub Desktop.
Late Subscribing and Polling with ReactiveX (http://blog.grio.com/2016/01/rxandroid-polling-and-other.html)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ConnectableObservable<Wrapper<List<Data>>> dataObservable; | |
public Observable<Wrapper<List<Data>>> getData() { | |
if (dataObservable == null) { | |
dataObservable = webService.getApiData() | |
.compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling) | |
.map(dataList -> Wrapper.response(dataList)) | |
// Retry a few times before letting an error through. | |
.retry(5) | |
.onErrorResumeNext(throwable -> { | |
// If the server is unreachable, let the subscriber know. | |
// The user may be able to do something about his connection. | |
if (throwable instanceof UnknownHostException || throwable instanceof ...) { | |
// Recoverable error - transmit error without breaking. | |
return Observable.just(Wrapper.error(throwable)); | |
} | |
// In case of an unrecoverable error, let it end. | |
return Observable.error(throwable); | |
}) | |
.repeatWhen(observable -> | |
Observable.interval(5, TimeUnit.MINUTES) | |
.mergeWith(hook) | |
) | |
.replay(1); | |
dataObservable.connect(); | |
} | |
return dataObservable; | |
} | |
public void triggerDataFetch() { | |
hook.onNext(-1L); | |
} | |
Observable.Transformer networkScheduling = observable -> | |
observable.subscribeOn(Schedulers.io()) | |
.unsubscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()); | |
// Consumer | |
dataService.getData(). | |
subscribe(wrapper -> { | |
if (wrapper.hasError()) { | |
// handle error | |
} else { | |
List<Data> list = wrapper.response; | |
// do something. | |
} | |
}, e -> logger.error("Unrecoverable error getting data.", e)); | |
public interface Webservice { | |
@GET("api/data") | |
public Observable<List<Data>> getApiData(); | |
} | |
public static class Wrapper<ResponseType> { | |
public Throwable error; | |
public ResponseType response; | |
private Wrapper() { } | |
public static <T> Wrapper<T> error(Throwable t) { | |
Wrapper<T> wrapper = new Wrapper<T>(); | |
wrapper.error = t; | |
return wrapper; | |
} | |
public static <T> Wrapper<T> response(T response) { | |
Wrapper<T> wrapper = new Wrapper<T>(); | |
wrapper.response = response; | |
return wrapper; | |
} | |
public boolean hasError() { return error != null; } | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment