Last active
August 29, 2015 14:06
-
-
Save fabioCollini/65ae1e3a3dbae4623b98 to your computer and use it in GitHub Desktop.
ObservableQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MainActivity extends ActionBarActivity { | |
//... | |
@Override protected void onStart() { | |
super.onStart(); | |
subscriptions.add(repoService.getRepoListObservable().subscribe(new Action1<Observable<List<Repo>>>() { | |
@Override public void call(Observable<List<Repo>> listObservable) { | |
showProgress(); | |
subscriptions.add(listObservable.subscribe(new Action1<List<Repo>>() { | |
@Override public void call(List<Repo> repos) { | |
//update the ui | |
} | |
}, new Action1<Throwable>() { | |
@Override public void call(Throwable throwable) { | |
//... | |
} | |
})); | |
} | |
})); | |
subscriptions.add(repoService.getRepoObservable().subscribe(new Action1<Observable<Repo>>() { | |
@Override public void call(Observable<Repo> repoObservable) { | |
repoObservable.subscribe(new Action1<Repo>() { | |
@Override public void call(Repo repo) { | |
//update the ui | |
} | |
}, new Action1<Throwable>() { | |
@Override public void call(Throwable t) { | |
//... | |
} | |
}); | |
} | |
})); | |
} | |
@Override protected void onStop() { | |
subscriptions.unsubscribe(); | |
super.onStop(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ObservableQueue<T> { | |
private PublishSubject<Observable<T>> publishSubject = PublishSubject.create(); | |
private List<Observable<T>> runningObservables = new ArrayList<Observable<T>>(); | |
private Observable<T> lastCompleted; | |
private boolean replayLast; | |
public ObservableQueue(boolean replayLast) { | |
this.replayLast = replayLast; | |
} | |
public void onNext(final ConnectableObservable<T> observable) { | |
runningObservables.add(observable); | |
observable.subscribe(new Observer<T>() { | |
@Override public void onCompleted() { | |
lastCompleted = observable; | |
runningObservables.remove(observable); | |
} | |
@Override public void onError(Throwable e) { | |
runningObservables.remove(observable); | |
} | |
@Override public void onNext(T t) { | |
} | |
}); | |
publishSubject.onNext(observable); | |
} | |
public Observable<Observable<T>> asObservable() { | |
if (!runningObservables.isEmpty()) { | |
return Observable.concat(Observable.from(runningObservables), publishSubject); | |
} else if (lastCompleted != null && replayLast) { | |
return Observable.concat(Observable.just(lastCompleted), publishSubject); | |
} else { | |
return publishSubject.asObservable(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RepoService { | |
private GitHubService gitHubService; | |
private ObservableQueue<Repo> repoQueue = new ObservableQueue<Repo>(false); | |
private ObservableQueue<List<Repo>> loadQueue = new ObservableQueue<List<Repo>>(true); | |
public RepoService(GitHubService gitHubService) { | |
this.gitHubService = gitHubService; | |
} | |
public void listRepos(FragmentActivity activity, String queryString) { | |
Observable<List<Repo>> observable = gitHubService.listReposRx(queryString) | |
.map(new Func1<RepoResponse, List<Repo>>() { | |
@Override public List<Repo> call(RepoResponse repoResponse) { | |
return repoResponse.getItems(); | |
} | |
}); | |
loadQueue.onNext(RxFragment.bindActivity(activity, observable)); | |
} | |
public Observable<Observable<List<Repo>>> getRepoListObservable() { | |
return loadQueue.asObservable(); | |
} | |
public Observable<Observable<Repo>> getRepoObservable() { | |
return repoQueue.asObservable(); | |
} | |
public void toggleStar(FragmentActivity activity, final Repo repo) { | |
Observable<Repo> observable = Observable | |
.create(new Observable.OnSubscribe<Repo>() { | |
@Override public void call(Subscriber<? super Repo> subscriber) { | |
//... | |
} | |
}) | |
.finallyDo(new Action0() { | |
@Override public void call() { | |
repo.setUpdating(false); | |
} | |
}); | |
repoQueue.onNext(RxFragment.bindActivity(activity, observable)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//fragment to stop ConnectableObservable when the activity is "really" destroyed | |
public class RxFragment extends Fragment { | |
private static final String TAG = "RxFragment"; | |
private CompositeSubscription subscriptions = new CompositeSubscription(); | |
public RxFragment() { | |
setRetainInstance(true); | |
} | |
public static <T> ConnectableObservable<T> bindActivity(FragmentActivity activity, Observable<T> observable) { | |
Observable<T> background = RxUtils.background(activity, observable); | |
return bindObservable(activity.getSupportFragmentManager(), background); | |
} | |
public static <T> ConnectableObservable<T> bindObservable(FragmentManager fragmentManager, Observable<T> observable) { | |
RxFragment fragment = (RxFragment) fragmentManager.findFragmentByTag(TAG); | |
if (fragment == null) { | |
fragment = new RxFragment(); | |
fragmentManager.beginTransaction().add(fragment, TAG).commit(); | |
} | |
ConnectableObservable<T> replay = observable.replay(1); | |
fragment.subscriptions.add(replay.connect()); | |
return replay; | |
} | |
@Override public void onDestroy() { | |
super.onDestroy(); | |
subscriptions.unsubscribe(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RxUtils { | |
private static Scheduler io = Schedulers.io(); | |
public static <T> Observable<T> background(Activity activity, Observable<T> observable) { | |
return AndroidObservable.bindActivity(activity, observable.subscribeOn(io)); | |
} | |
//change scheduler in test methods | |
public static void setIo(Scheduler io) { | |
RxUtils.io = io; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment