Skip to content

Instantly share code, notes, and snippets.

@jonas-grgt
Last active June 13, 2017 08:05
Show Gist options
  • Save jonas-grgt/1c74a3374a217c2e249ee63614a9c56e to your computer and use it in GitHub Desktop.
Save jonas-grgt/1c74a3374a217c2e249ee63614a9c56e to your computer and use it in GitHub Desktop.
Abstraction above Observable to make it more functional
import android.support.annotation.NonNull;
import com.annimon.stream.function.Consumer;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
public class RetroObservable<T> {
private final Observable<T> observable;
private final SchedulerProvider schedulers;
private Runnable onCompleted;
private Consumer<T> onCompletedWithResult;
private Consumer<T> onNext;
private Consumer<Throwable> onError;
public RetroObservable(Observable<T> observable, SchedulerProvider schedulers) {
this.observable = observable;
this.schedulers = schedulers;
}
public RetroObservable<T> onCompleted(Runnable completed) {
this.onCompleted = completed;
return this;
}
public RetroObservable<T> onCompleted(Consumer<T> onCompletedWithResult) {
this.onCompletedWithResult = onCompletedWithResult;
return this;
}
public RetroObservable<T> onNext(Consumer<T> onNext) {
this.onNext = onNext;
return this;
}
public RetroObservable<T> onError(Consumer<Throwable> onError) {
this.onError = onError;
return this;
}
public Subscription subscribe() {
return observable.subscribeOn(schedulers.io())
.observeOn(schedulers.ui())
.subscribe(new Subscriber<T>() {
private T result;
@Override
public void onCompleted() {
if (onCompleted != null)
onCompleted.run();
if(onCompletedWithResult != null)
onCompletedWithResult.accept(result);
}
@Override
public void onError(Throwable e) {
if (onError != null)
onError.accept(e);
}
@Override
public void onNext(T result) {
this.result = result;
if (onNext != null)
onNext.accept(result);
}
});
}
public interface SchedulerProvider {
@NonNull
Scheduler computation();
@NonNull
Scheduler io();
@NonNull
Scheduler ui();
@NonNull
Scheduler immediate();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment