Skip to content

Instantly share code, notes, and snippets.

@juanmc2005
Created June 14, 2017 19:43
Show Gist options
  • Save juanmc2005/df0c6746c5435b3898ffc05bdd4015b8 to your computer and use it in GitHub Desktop.
Save juanmc2005/df0c6746c5435b3898ffc05bdd4015b8 to your computer and use it in GitHub Desktop.
Toy RxJava2 Observable providing some utilities for networking use cases
package com.example.juancoria.rxjavaplayground;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
/**
* Created by juan.coria on 08/06/17.
*/
public class NetworkObservable<T> implements SingleSource<T> {
@SuppressWarnings("unchecked")
public static <A> NetworkObservable<A> wrapping(SingleSource<? extends A> source) {
return new NetworkObservable<>(source);
}
public static <A> NetworkObservable<A> fromEndpointCall(Callable<? extends SingleSource<? extends A>> callable) {
try {
final SingleSource<? extends A> source = callable.call();
return wrapping(source);
} catch (Throwable t) {
return wrapping(Single.error(t));
}
}
private final SingleSource<? extends T> actual;
private NetworkObservable(SingleSource<? extends T> actual) {
this.actual = actual;
}
public <A> NetworkObservable<A> map(Function<? super T, ? extends A> mapper) {
return wrapping(Single.wrap(actual).map(mapper));
}
public <A> NetworkObservable<A> flatMap(Function<? super T, ? extends NetworkObservable<? extends A>> mapper) {
return wrapping(Single.wrap(actual).flatMap(mapper));
}
public NetworkObservable<T> pollUntil(Predicate<T> predicate) {
return wrapping(
Observable.interval(1, TimeUnit.SECONDS)
.flatMapSingle(x -> Single.wrap(actual))
.takeUntil(predicate)
.filter(predicate)
.firstOrError()
);
}
public Disposable subscribe(Consumer<T> onNext, Consumer<Throwable> onError) {
return Single.wrap(actual)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNext, onError);
}
@Override
public void subscribe(@NonNull SingleObserver<? super T> observer) {
Single.wrap(actual)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment