Skip to content

Instantly share code, notes, and snippets.

@thanksmister
Last active February 11, 2020 09:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thanksmister/76d8ed3385b9f9f86e74 to your computer and use it in GitHub Desktop.
Save thanksmister/76d8ed3385b9f9f86e74 to your computer and use it in GitHub Desktop.
RxJava Polling Example
// data sevice class calling API and returning list of exchanges
public class DataService
{
private PublishSubject<List<Exchange>> exchangesRequest;
public DataService()
{}
// Get exchanges without any polling which returns results to subscription on time
public Subscription getExchanges(final Observer<List<Exchange>> observer)
{
// join the request if instantiated
if(exchangesRequest != null) {
return exchangesRequest.subscribe(observer);
}
exchangesRequest = PublishSubject.create();
exchangesRequest.subscribe(new Observer<List<Exchange>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Exchange> exchanges) {
// Cache data in database
}
});
Subscription subscription = exchangesRequest.subscribe(observer);
getExchangesObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(exchangesRequest);
return subscription;
}
// Get exchanges on a timer every 5 minutes, problem with this is stops on error and emmits two results every time it runs
public Subscription getExchangesTimer(final Observer<List<Exchange>> observer)
{
// join the request if instantiated
if(exchangesRequest != null) {
return exchangesRequest.subscribe(observer);
}
exchangesRequest = PublishSubject.create();
exchangesRequest.subscribe(new Observer<List<Exchange>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Exchange> exchanges) {
// Cache data in database
}
});
// TODO this returns twice for each time it runs, also stops polling on error
Subscription subscription = exchangesRequest.subscribe(observer);
Observable.timer(0, (2 * 60 * 1000), TimeUnit.MILLISECONDS).timeInterval()
.flatMap(new Func1<TimeInterval<Long>, Observable<List<Exchange>>>() {
@Override
public Observable<List<Exchange>> call(TimeInterval<Long> longTimeInterval) {
return getExchangesObservable();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(exchangesRequest);
return subscription;
}
// get exchanges scheduled periodically, problem with this approach is runs continuously ignoring interval and explodes application
public Subscription getExchangesPeriodically(final Observer<List<Exchange>> observer)
{
// join the request if instantiated
if(exchangesRequest != null) {
return exchangesRequest.subscribe(observer);
}
exchangesRequest = PublishSubject.create();
exchangesRequest.subscribe(new Observer<List<Exchange>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Exchange> exchanges) {
// Cache data in database
}
});
Subscription subscription = exchangesRequest.subscribe(observer);
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedulePeriodically(new Action0() {
@Override
public void call() {
getExchangesObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(exchangesRequest);
}
}, 0, (2 * 60 * 1000), TimeUnit.MILLISECONDS);
return subscription;
}
// Uses a Schedulers.io().createWorker() to run the service call periodically.
public Subscription getExchangesWorker(final Observer<List<Exchange>> observer)
{
// join the request if instantiated
if(exchangesRequest != null) {
return exchangesRequest.subscribe(observer);
}
exchangesRequest = PublishSubject.create();
exchangesRequest.subscribe(new Observer<List<Exchange>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Exchange> exchanges) {
// Cache data in database
}
});
Subscription subscription = exchangesRequest.subscribe(observer);
Observable.create(new Observable.OnSubscribe<List<Exchange>>()
{
@Override
public void call(Subscriber<? super List<Exchange>> subscriber)
{
Schedulers.io().createWorker()
.schedulePeriodically(new Action0()
{
@Override
public void call()
{
getExchangesObservable().doOnNext(new Action1<List<Exchange>>()
{
@Override
public void call(List<Exchange> exchanges)
{
subscriber.onNext(exchanges);
}
}).doOnError(new Action1<Throwable>()
{
@Override
public void call(Throwable throwable)
{
if(throwable != null)
subscriber.onError(throwable);
}
}).subscribe();
}
}, 500, CHECK_EXCHANGE_DATA, TimeUnit.MILLISECONDS);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(exchangesRequest);
return subscription;
}
private Observable<List<Exchange>> getExchangesObservable()
{
BitcoinAverage bitcoinAverage = provideBitcoinAverage();
return bitcoinAverage.exchanges(USD)
}
private BitcoinAverage provideBitcoinAverage()
{
RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("https://api.bitcoinaverage.com")
.build();
return restAdapter.create(BitcoinAverage.class);
}
private interface BitcoinAverage
{
@GET("/exchanges/{currency}")
Observable<List<Exchange>> exchanges(@Path("currency") String currency);
}
public class Exchange
{
public String display_name;
public String ask;
public String bid;
public String last;
public String source;
public float volume_btc;
public float volume_percent;
public String blue_bid;
public String blue_ask;
public String official_bid;
public String official_ask;
public String date;
}
}
public class ExchangeView extends FrameLayout
{
private Subscription subscription;
private DataService dataService;
public ExchangeView(Context context, AttributeSet attrs)
{
super(context, attrs);
dataService = new DataServices();
}
@Override
protected void onFinishInflate()
{
super.onFinishInflate();
getExchangeData();
}
@Override
protected void onDetachedFromWindow()
{
super.onDetachedFromWindow();
if(subscription != null)
subscription.unsubscribe();
}
public void getExchangeData()
{
// should update exchanges on interval, handle errors, and be able to unsubscribe
subscription = dataService.getExchangesTimer(new Observer<List<Exchange>>()
{
@Override
public void onNext(List<Exchange> exchanges) {
// TODO update exchanges on view
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof RetrofitError) {
if (((RetrofitError) throwable).isNetworkError()) {
// TODO handle network error
} else {
// TODO handle service error
}
}
}
}
}
}
@thanksmister
Copy link
Author

This is what I have currently and also what I have tried when creating a subscription that has polling. You can see the regular subscription (getExchanges), the subscription using a timer (getExchangesTimer) and the subscription that uses a scheduler (getExchangesPeriodically).

The Timer seems to fire more than once and stops running on error. The Scheduler runs continuously (ignoring the time interval) and eventually causes a memory error in the application. Right now I am manually refreshing the data using a runner instead of trying to do this in RxJava.

@thanksmister
Copy link
Author

Please note that the API returns a "Response" from Retrofit that must be formatted into an Exchange. I have made it look as though Exchanges are properly returned in a list without the need for extra formatting but in reality they do. So this won't just run as is.

@thanksmister
Copy link
Author

I have looked at the notes here (ReactiveX/RxJava#448) but its not clear how to return the subscription back to view and the method outlined for using a Scheduler are now deprecated in latest RxJava:

Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });

It's also hard to reverse engineer a lambda without being familiar with the original.

@thanksmister
Copy link
Author

Added getExchangesWorker method for using a scheduled worker within an observable for hitting the API periodically. This method seems to be working until I encounter an error then it stops emitting data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment