Skip to content

Instantly share code, notes, and snippets.

@codeprogression
Last active January 5, 2016 16:11
Show Gist options
  • Save codeprogression/00f186f5a8bb4e0b7746 to your computer and use it in GitHub Desktop.
Save codeprogression/00f186f5a8bb4e0b7746 to your computer and use it in GitHub Desktop.
Encapsulate polling an endpoint using RxJava, OkHttp, and Gson
import com.google.gson.Gson;
import com.squareup.okhttp.OkHttpClient;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class ExamplePoller extends RxPoller<Example> {
@Inject
public ExamplePoller(OkHttpClient client, Gson gson) {
super(client, gson);
}
public void setListener(RxPoller.Configuration<Example> configuration) {
super.configure(listener, configuration.class);
}
}
import android.util.Pair;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
@Singleton
public class ExampleProvider implements RxPoller.Configuration<Example>{
public static final Pair<Integer, TimeUnit> POLLING_INTERVAL = new Pair<>(5, TimeUnit.MINUTES);
private final ExamplePoller poller;
private final String exampleUrl;
private Subscription subscription;
private BehaviorSubject<Example> exampleSubject;
@Inject
public ExampleProvider(ExamplePoller poller, @Named("EXAMPLE_URL") String exampleUrl){
this.exampleUrl = exampleUrl;
this.poller = poller;
poller.configure(this);
exampleSubject = BehaviorSubject.create(new Example());
startPolling();
}
private void startPolling() {
subscription = Observable.create(this.poller)
.subscribe(new Subscriber<Example>() {
@Override public void onCompleted() {
exampleSubject.onCompleted();
}
@Override public void onError(Throwable e) {
exampleSubject.onError(e);
}
@Override public void onNext(Example example) {
exampleSubject.onNext(example);
}
});
}
public synchronized Observable<Example> getObservable() {
if (subscription == null || subscription.isUnsubscribed()) {
startPolling();
}
return exampleSubject.asObservable()
.subscribeOn(Schedulers.io())
.doOnUnsubscribe(new Action0() {
@Override public void call() {
poller.cancel();
subscription.unsubscribe();
}
});
}
@Override
public int getMaxRetries() {
return 10;
}
@Override public Pair<Integer, TimeUnit> getPollingInterval(Example result) {
return POLLING_INTERVAL;
}
@Override
public String getUrl() {
return examplenUrl;
}
@Override public boolean enableNulls() {
return false;
}
public void update(Example example) {
exampleSubject.onNext(example);
}
}
import javax.inject.Inject;
import rx.Subscriber;
public class ExampleClient {
ExampleProvider provider;
@Inject
public ExampleClient(ExampleProvider provider){
this.provider = provider;
}
public void doSomething(){
provider.getObservable()
.subscribe(new Subscriber<Example>(){
@Override public void onCompleted(){}
@Override public void onError(Throwable e){}
@Override public void onNext(Example example){
// do something with example object
}
});
}
}
import android.text.TextUtils;
import android.util.Pair;
import com.google.gson.Gson;
import com.squareup.okhttp.Call;
import com.squareup.okhttp.Callback;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import java.io.IOException;
import java.net.SocketException;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import timber.log.Timber;
public class RxPoller<T> implements Observable.OnSubscribe<T> {
private final OkHttpClient client;
private final Gson gson;
private int retry = 0;
protected Configuration<T> configuration;
protected Class<?> resultClass;
public Pair<Integer, TimeUnit> pollingInterval;
private String lastModified;
private Scheduler.Worker worker;
private Action0 action;
private String etag;
protected String responseBody;
private Call call;
protected Subscriber<? super T> subscriber;
public RxPoller(OkHttpClient client, Gson gson) {
this.client = client;
this.gson = gson;
}
public void configure(Configuration<T> configuration, Class<?> resultClass) {
this.configuration = configuration;
this.resultClass = resultClass;
}
public Subscriber<? super T> getSubscriber(){
return subscriber;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
if (worker == null || worker.isUnsubscribed()) {
worker = Schedulers.io().createWorker();
worker.schedule(getAction());
} else if (!TextUtils.isEmpty(responseBody)) {
subscriber.onNext(getResult());
}
}
protected T getResult() {
return (T) gson.fromJson(responseBody, resultClass);
}
protected Action0 getAction() {
if (action == null) {
action = new Action0() {
@Override public void call() {
try {
String url = configuration.getUrl();
if (url == null) {
Timber.d("URL is null for %s, delaying 15 seconds", resultClass.getSimpleName());
if (responseBody == null) {
pollingInterval = new Pair<>(15, TimeUnit.SECONDS);
}
scheduleWorker();
return;
}
Timber.d("Getting " + resultClass.getSimpleName() + " from NETWORK");
Request.Builder request = new Request.Builder()
.url(url)
.get();
if (lastModified != null)
request.addHeader("If-Modified-Since", lastModified);
if (etag != null)
request.addHeader("If-None-Match", etag);
call = client.newCall(request.build());
call.enqueue(PollingSubscriber.this.getCallback());
} catch (Exception e) {
if (subscriber != null){
subscriber.onError(e);
}
Timber.d(e, "Error while polling");
}
}
}
;
}
return action;
}
private Callback getCallback() {
return new Callback() {
@Override
public void onFailure(Request request, IOException e) {
if ("Canceled".equals(e.getMessage())) return;
handleFailure(e);
}
@Override
public void onResponse(Response response) throws IOException {
Throwable error = getFailureExceptionOnBadStatus(response);
if (error != null) {
handleFailure(error);
return;
}
String lastModified = response.header("Last-Modified");
String etag = response.header("Etag");
retry = 0;
if (isModified(etag)) {
Timber.d("Received response - LAST-MODIFIED: " + lastModified);
Timber.d("Received response - ETAG: " + etag);
T result;
try {
//noinspection unchecked
result = (T) gson.fromJson(response.body().charStream(), resultClass);
responseBody = gson.toJson(result);
} catch (Exception e) {
if ((e instanceof SocketException)) {
Timber.e(e, "");
// scheduleWorker();
} else {
handleFailure(e);
}
return;
}
Pair<Integer, TimeUnit> interval = configuration.getPollingInterval(result);
pollingInterval = interval;
if (result != null || configuration.enableNulls()) {
if (subscriber != null){
subscriber.onNext(result);
}
}
RxPoller.this.lastModified = lastModified;
RxPoller.this.etag = etag;
}
Timber.d("Next poll in %s %s",
Integer.toString(pollingInterval.first),
pollingInterval.second.toString());
scheduleWorker();
}
};
}
public void kick() {
if (worker != null){
worker.unsubscribe();
}
worker = Schedulers.io().createWorker();
worker.schedule(getAction());
}
protected void scheduleWorker() {
if ( worker == null || worker.isUnsubscribed()) {
worker = Schedulers.io().createWorker();
}
worker.schedule(getAction(), pollingInterval.first, pollingInterval.second);
}
protected void sendCachedResult() {
Timber.d("Getting " + resultClass.getName() + " from CACHE");
T result = getResult();
if (subscriber != null){
subscriber.onNext(result);
}
scheduleWorker();
}
private boolean isModified(String etag) {
boolean etagMatches = this.etag != null && this.etag.equals(etag);
if (etagMatches) Timber.d("ETAG match, no changes: " + etag);
return !etagMatches;
}
private void handleFailure(Throwable e) {
Timber.e("Error received: ", e);
if (!suppressFailureRetry()) {
pollingInterval = new Pair<>(10, TimeUnit.SECONDS);
Timber.d("Setting polling time to: " + pollingInterval);
retry++;
if (retry > configuration.getMaxRetries()) {
if (subscriber != null) {
subscriber.onError(e);
}
return;
}
}
scheduleWorker();
}
protected boolean suppressFailureRetry() {
return false;
}
private Throwable getFailureExceptionOnBadStatus(Response resp) {
if (resp.code() < 399) return null;
String message = String.format("Polling subscriber received bad response for: %s\n\t%d %s",
resp.request().urlString(), resp.code(), resp.body());
return new Exception(message);
}
private void cancelExistingCalls(final Call call, Subscriber<? super T> subscriber) {
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
call.cancel();
}
}));
}
public void cancel() {
if (worker != null) {
worker.unsubscribe();
}
new Thread(new Runnable() {
@Override public void run() {
if (call == null || subscriber == null) return;
cancelExistingCalls(call, subscriber);
}
}).start();
}
public Class<?> getResultClass() {
return resultClass;
}
public interface Configuration<T> {
int getMaxRetries();
Pair<Integer, TimeUnit> getPollingInterval(T result);
String getUrl();
boolean enableNulls();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment