Skip to content

Instantly share code, notes, and snippets.

@fabioCollini
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabioCollini/65ae1e3a3dbae4623b98 to your computer and use it in GitHub Desktop.
Save fabioCollini/65ae1e3a3dbae4623b98 to your computer and use it in GitHub Desktop.
ObservableQueue
public class MainActivity extends ActionBarActivity {
//...
@Override protected void onStart() {
super.onStart();
subscriptions.add(repoService.getRepoListObservable().subscribe(new Action1<Observable<List<Repo>>>() {
@Override public void call(Observable<List<Repo>> listObservable) {
showProgress();
subscriptions.add(listObservable.subscribe(new Action1<List<Repo>>() {
@Override public void call(List<Repo> repos) {
//update the ui
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
//...
}
}));
}
}));
subscriptions.add(repoService.getRepoObservable().subscribe(new Action1<Observable<Repo>>() {
@Override public void call(Observable<Repo> repoObservable) {
repoObservable.subscribe(new Action1<Repo>() {
@Override public void call(Repo repo) {
//update the ui
}
}, new Action1<Throwable>() {
@Override public void call(Throwable t) {
//...
}
});
}
}));
}
@Override protected void onStop() {
subscriptions.unsubscribe();
super.onStop();
}
}
public class ObservableQueue<T> {
private PublishSubject<Observable<T>> publishSubject = PublishSubject.create();
private List<Observable<T>> runningObservables = new ArrayList<Observable<T>>();
private Observable<T> lastCompleted;
private boolean replayLast;
public ObservableQueue(boolean replayLast) {
this.replayLast = replayLast;
}
public void onNext(final ConnectableObservable<T> observable) {
runningObservables.add(observable);
observable.subscribe(new Observer<T>() {
@Override public void onCompleted() {
lastCompleted = observable;
runningObservables.remove(observable);
}
@Override public void onError(Throwable e) {
runningObservables.remove(observable);
}
@Override public void onNext(T t) {
}
});
publishSubject.onNext(observable);
}
public Observable<Observable<T>> asObservable() {
if (!runningObservables.isEmpty()) {
return Observable.concat(Observable.from(runningObservables), publishSubject);
} else if (lastCompleted != null && replayLast) {
return Observable.concat(Observable.just(lastCompleted), publishSubject);
} else {
return publishSubject.asObservable();
}
}
}
public class RepoService {
private GitHubService gitHubService;
private ObservableQueue<Repo> repoQueue = new ObservableQueue<Repo>(false);
private ObservableQueue<List<Repo>> loadQueue = new ObservableQueue<List<Repo>>(true);
public RepoService(GitHubService gitHubService) {
this.gitHubService = gitHubService;
}
public void listRepos(FragmentActivity activity, String queryString) {
Observable<List<Repo>> observable = gitHubService.listReposRx(queryString)
.map(new Func1<RepoResponse, List<Repo>>() {
@Override public List<Repo> call(RepoResponse repoResponse) {
return repoResponse.getItems();
}
});
loadQueue.onNext(RxFragment.bindActivity(activity, observable));
}
public Observable<Observable<List<Repo>>> getRepoListObservable() {
return loadQueue.asObservable();
}
public Observable<Observable<Repo>> getRepoObservable() {
return repoQueue.asObservable();
}
public void toggleStar(FragmentActivity activity, final Repo repo) {
Observable<Repo> observable = Observable
.create(new Observable.OnSubscribe<Repo>() {
@Override public void call(Subscriber<? super Repo> subscriber) {
//...
}
})
.finallyDo(new Action0() {
@Override public void call() {
repo.setUpdating(false);
}
});
repoQueue.onNext(RxFragment.bindActivity(activity, observable));
}
}
//fragment to stop ConnectableObservable when the activity is "really" destroyed
public class RxFragment extends Fragment {
private static final String TAG = "RxFragment";
private CompositeSubscription subscriptions = new CompositeSubscription();
public RxFragment() {
setRetainInstance(true);
}
public static <T> ConnectableObservable<T> bindActivity(FragmentActivity activity, Observable<T> observable) {
Observable<T> background = RxUtils.background(activity, observable);
return bindObservable(activity.getSupportFragmentManager(), background);
}
public static <T> ConnectableObservable<T> bindObservable(FragmentManager fragmentManager, Observable<T> observable) {
RxFragment fragment = (RxFragment) fragmentManager.findFragmentByTag(TAG);
if (fragment == null) {
fragment = new RxFragment();
fragmentManager.beginTransaction().add(fragment, TAG).commit();
}
ConnectableObservable<T> replay = observable.replay(1);
fragment.subscriptions.add(replay.connect());
return replay;
}
@Override public void onDestroy() {
super.onDestroy();
subscriptions.unsubscribe();
}
}
public class RxUtils {
private static Scheduler io = Schedulers.io();
public static <T> Observable<T> background(Activity activity, Observable<T> observable) {
return AndroidObservable.bindActivity(activity, observable.subscribeOn(io));
}
//change scheduler in test methods
public static void setIo(Scheduler io) {
RxUtils.io = io;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment