Last active
February 11, 2020 09:23
-
-
Save thanksmister/76d8ed3385b9f9f86e74 to your computer and use it in GitHub Desktop.
RxJava Polling Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// data sevice class calling API and returning list of exchanges | |
public class DataService | |
{ | |
private PublishSubject<List<Exchange>> exchangesRequest; | |
public DataService() | |
{} | |
// Get exchanges without any polling which returns results to subscription on time | |
public Subscription getExchanges(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
getExchangesObservable() | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
return subscription; | |
} | |
// Get exchanges on a timer every 5 minutes, problem with this is stops on error and emmits two results every time it runs | |
public Subscription getExchangesTimer(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
// TODO this returns twice for each time it runs, also stops polling on error | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
Observable.timer(0, (2 * 60 * 1000), TimeUnit.MILLISECONDS).timeInterval() | |
.flatMap(new Func1<TimeInterval<Long>, Observable<List<Exchange>>>() { | |
@Override | |
public Observable<List<Exchange>> call(TimeInterval<Long> longTimeInterval) { | |
return getExchangesObservable(); | |
} | |
}) | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
return subscription; | |
} | |
// get exchanges scheduled periodically, problem with this approach is runs continuously ignoring interval and explodes application | |
public Subscription getExchangesPeriodically(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
Scheduler.Worker worker = Schedulers.newThread().createWorker(); | |
worker.schedulePeriodically(new Action0() { | |
@Override | |
public void call() { | |
getExchangesObservable() | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
} | |
}, 0, (2 * 60 * 1000), TimeUnit.MILLISECONDS); | |
return subscription; | |
} | |
// Uses a Schedulers.io().createWorker() to run the service call periodically. | |
public Subscription getExchangesWorker(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
Observable.create(new Observable.OnSubscribe<List<Exchange>>() | |
{ | |
@Override | |
public void call(Subscriber<? super List<Exchange>> subscriber) | |
{ | |
Schedulers.io().createWorker() | |
.schedulePeriodically(new Action0() | |
{ | |
@Override | |
public void call() | |
{ | |
getExchangesObservable().doOnNext(new Action1<List<Exchange>>() | |
{ | |
@Override | |
public void call(List<Exchange> exchanges) | |
{ | |
subscriber.onNext(exchanges); | |
} | |
}).doOnError(new Action1<Throwable>() | |
{ | |
@Override | |
public void call(Throwable throwable) | |
{ | |
if(throwable != null) | |
subscriber.onError(throwable); | |
} | |
}).subscribe(); | |
} | |
}, 500, CHECK_EXCHANGE_DATA, TimeUnit.MILLISECONDS); | |
} | |
}) | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
return subscription; | |
} | |
private Observable<List<Exchange>> getExchangesObservable() | |
{ | |
BitcoinAverage bitcoinAverage = provideBitcoinAverage(); | |
return bitcoinAverage.exchanges(USD) | |
} | |
private BitcoinAverage provideBitcoinAverage() | |
{ | |
RestAdapter restAdapter = new RestAdapter.Builder() | |
.setEndpoint("https://api.bitcoinaverage.com") | |
.build(); | |
return restAdapter.create(BitcoinAverage.class); | |
} | |
private interface BitcoinAverage | |
{ | |
@GET("/exchanges/{currency}") | |
Observable<List<Exchange>> exchanges(@Path("currency") String currency); | |
} | |
public class Exchange | |
{ | |
public String display_name; | |
public String ask; | |
public String bid; | |
public String last; | |
public String source; | |
public float volume_btc; | |
public float volume_percent; | |
public String blue_bid; | |
public String blue_ask; | |
public String official_bid; | |
public String official_ask; | |
public String date; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ExchangeView extends FrameLayout | |
{ | |
private Subscription subscription; | |
private DataService dataService; | |
public ExchangeView(Context context, AttributeSet attrs) | |
{ | |
super(context, attrs); | |
dataService = new DataServices(); | |
} | |
@Override | |
protected void onFinishInflate() | |
{ | |
super.onFinishInflate(); | |
getExchangeData(); | |
} | |
@Override | |
protected void onDetachedFromWindow() | |
{ | |
super.onDetachedFromWindow(); | |
if(subscription != null) | |
subscription.unsubscribe(); | |
} | |
public void getExchangeData() | |
{ | |
// should update exchanges on interval, handle errors, and be able to unsubscribe | |
subscription = dataService.getExchangesTimer(new Observer<List<Exchange>>() | |
{ | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// TODO update exchanges on view | |
} | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
if (throwable instanceof RetrofitError) { | |
if (((RetrofitError) throwable).isNetworkError()) { | |
// TODO handle network error | |
} else { | |
// TODO handle service error | |
} | |
} | |
} | |
} | |
} | |
} |
Added getExchangesWorker method for using a scheduled worker within an observable for hitting the API periodically. This method seems to be working until I encounter an error then it stops emitting data.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have looked at the notes here (ReactiveX/RxJava#448) but its not clear how to return the subscription back to view and the method outlined for using a Scheduler are now deprecated in latest RxJava:
Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });
It's also hard to reverse engineer a lambda without being familiar with the original.