-
-
Save thanksmister/76d8ed3385b9f9f86e74 to your computer and use it in GitHub Desktop.
// data sevice class calling API and returning list of exchanges | |
public class DataService | |
{ | |
private PublishSubject<List<Exchange>> exchangesRequest; | |
public DataService() | |
{} | |
// Get exchanges without any polling which returns results to subscription on time | |
public Subscription getExchanges(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
getExchangesObservable() | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
return subscription; | |
} | |
// Get exchanges on a timer every 5 minutes, problem with this is stops on error and emmits two results every time it runs | |
public Subscription getExchangesTimer(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
// TODO this returns twice for each time it runs, also stops polling on error | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
Observable.timer(0, (2 * 60 * 1000), TimeUnit.MILLISECONDS).timeInterval() | |
.flatMap(new Func1<TimeInterval<Long>, Observable<List<Exchange>>>() { | |
@Override | |
public Observable<List<Exchange>> call(TimeInterval<Long> longTimeInterval) { | |
return getExchangesObservable(); | |
} | |
}) | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
return subscription; | |
} | |
// get exchanges scheduled periodically, problem with this approach is runs continuously ignoring interval and explodes application | |
public Subscription getExchangesPeriodically(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
Scheduler.Worker worker = Schedulers.newThread().createWorker(); | |
worker.schedulePeriodically(new Action0() { | |
@Override | |
public void call() { | |
getExchangesObservable() | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
} | |
}, 0, (2 * 60 * 1000), TimeUnit.MILLISECONDS); | |
return subscription; | |
} | |
// Uses a Schedulers.io().createWorker() to run the service call periodically. | |
public Subscription getExchangesWorker(final Observer<List<Exchange>> observer) | |
{ | |
// join the request if instantiated | |
if(exchangesRequest != null) { | |
return exchangesRequest.subscribe(observer); | |
} | |
exchangesRequest = PublishSubject.create(); | |
exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// Cache data in database | |
} | |
}); | |
Subscription subscription = exchangesRequest.subscribe(observer); | |
Observable.create(new Observable.OnSubscribe<List<Exchange>>() | |
{ | |
@Override | |
public void call(Subscriber<? super List<Exchange>> subscriber) | |
{ | |
Schedulers.io().createWorker() | |
.schedulePeriodically(new Action0() | |
{ | |
@Override | |
public void call() | |
{ | |
getExchangesObservable().doOnNext(new Action1<List<Exchange>>() | |
{ | |
@Override | |
public void call(List<Exchange> exchanges) | |
{ | |
subscriber.onNext(exchanges); | |
} | |
}).doOnError(new Action1<Throwable>() | |
{ | |
@Override | |
public void call(Throwable throwable) | |
{ | |
if(throwable != null) | |
subscriber.onError(throwable); | |
} | |
}).subscribe(); | |
} | |
}, 500, CHECK_EXCHANGE_DATA, TimeUnit.MILLISECONDS); | |
} | |
}) | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(exchangesRequest); | |
return subscription; | |
} | |
private Observable<List<Exchange>> getExchangesObservable() | |
{ | |
BitcoinAverage bitcoinAverage = provideBitcoinAverage(); | |
return bitcoinAverage.exchanges(USD) | |
} | |
private BitcoinAverage provideBitcoinAverage() | |
{ | |
RestAdapter restAdapter = new RestAdapter.Builder() | |
.setEndpoint("https://api.bitcoinaverage.com") | |
.build(); | |
return restAdapter.create(BitcoinAverage.class); | |
} | |
private interface BitcoinAverage | |
{ | |
@GET("/exchanges/{currency}") | |
Observable<List<Exchange>> exchanges(@Path("currency") String currency); | |
} | |
public class Exchange | |
{ | |
public String display_name; | |
public String ask; | |
public String bid; | |
public String last; | |
public String source; | |
public float volume_btc; | |
public float volume_percent; | |
public String blue_bid; | |
public String blue_ask; | |
public String official_bid; | |
public String official_ask; | |
public String date; | |
} | |
} |
public class ExchangeView extends FrameLayout | |
{ | |
private Subscription subscription; | |
private DataService dataService; | |
public ExchangeView(Context context, AttributeSet attrs) | |
{ | |
super(context, attrs); | |
dataService = new DataServices(); | |
} | |
@Override | |
protected void onFinishInflate() | |
{ | |
super.onFinishInflate(); | |
getExchangeData(); | |
} | |
@Override | |
protected void onDetachedFromWindow() | |
{ | |
super.onDetachedFromWindow(); | |
if(subscription != null) | |
subscription.unsubscribe(); | |
} | |
public void getExchangeData() | |
{ | |
// should update exchanges on interval, handle errors, and be able to unsubscribe | |
subscription = dataService.getExchangesTimer(new Observer<List<Exchange>>() | |
{ | |
@Override | |
public void onNext(List<Exchange> exchanges) { | |
// TODO update exchanges on view | |
} | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
if (throwable instanceof RetrofitError) { | |
if (((RetrofitError) throwable).isNetworkError()) { | |
// TODO handle network error | |
} else { | |
// TODO handle service error | |
} | |
} | |
} | |
} | |
} | |
} |
Please note that the API returns a "Response" from Retrofit that must be formatted into an Exchange. I have made it look as though Exchanges are properly returned in a list without the need for extra formatting but in reality they do. So this won't just run as is.
I have looked at the notes here (ReactiveX/RxJava#448) but its not clear how to return the subscription back to view and the method outlined for using a Scheduler are now deprecated in latest RxJava:
Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });
It's also hard to reverse engineer a lambda without being familiar with the original.
Added getExchangesWorker method for using a scheduled worker within an observable for hitting the API periodically. This method seems to be working until I encounter an error then it stops emitting data.
This is what I have currently and also what I have tried when creating a subscription that has polling. You can see the regular subscription (getExchanges), the subscription using a timer (getExchangesTimer) and the subscription that uses a scheduler (getExchangesPeriodically).
The Timer seems to fire more than once and stops running on error. The Scheduler runs continuously (ignoring the time interval) and eventually causes a memory error in the application. Right now I am manually refreshing the data using a runner instead of trying to do this in RxJava.