Skip to content

Instantly share code, notes, and snippets.

@fabioCollini
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabioCollini/0036f66a87a97295d387 to your computer and use it in GitHub Desktop.
Save fabioCollini/0036f66a87a97295d387 to your computer and use it in GitHub Desktop.
public interface EndlessObserver<T> {
void onNext(T t);
void onNextError(Throwable t);
}
public class EndlessSubject<T> implements Observer<T> {
private PublishSubject<Result<T>> subject = PublishSubject.create();
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
subject.onNext(new Result<T>(null, e));
}
@Override public void onNext(T repo) {
subject.onNext(new Result<T>(repo, null));
}
public Subscription subscribe(final EndlessObserver<T> endlessObserver) {
return subject.subscribe(new Observer<Result<T>>() {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
endlessObserver.onNextError(e);
}
@Override public void onNext(Result<T> pair) {
if (pair.throwable != null) {
endlessObserver.onNextError(pair.throwable);
} else {
endlessObserver.onNext(pair.success);
}
}
});
}
private static class Result<T> {
T success;
Throwable throwable;
public Result(T success, Throwable throwable) {
this.success = success;
this.throwable = throwable;
}
}
}
public class MainActivity extends ActionBarActivity {
private Subscription busSubscription;
//...
@Override protected void onStart() {
super.onStart();
busSubscription = repoService.subscribe(new EndlessObserver<Repo>() {
@Override public void onNext(Repo repo) {
repoAdapter.notifyDataSetChanged();
}
@Override public void onNextError(Throwable t) {
repoAdapter.notifyDataSetChanged();
Toast.makeText(MainActivity.this, "Error " + t.getMessage(), Toast.LENGTH_SHORT).show();
}
});
}
@Override protected void onStop() {
busSubscription.unsubscribe();
super.onStop();
}
@OnItemClick(R.id.list) void toggleStarOnItemClick(int position) {
Repo repo = repoAdapter.getItem(position);
repoService.toggleStar(repo);
}
}
public class RepoService {
private EndlessSubject<Repo> subject = new EndlessSubject<Repo>();
public Subscription subscribe(EndlessObserver<Repo> observer) {
return subject.subscribe(observer);
}
public void toggleStar(final Repo repo) {
Observable
.create(new Observable.OnSubscribe<Repo>() {
@Override public void call(Subscriber<? super Repo> subscriber) {
//....
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subject);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment