Created
June 14, 2017 19:43
-
-
Save juanmc2005/df0c6746c5435b3898ffc05bdd4015b8 to your computer and use it in GitHub Desktop.
Toy RxJava2 Observable providing some utilities for networking use cases
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.juancoria.rxjavaplayground; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.TimeUnit; | |
import io.reactivex.Observable; | |
import io.reactivex.Single; | |
import io.reactivex.SingleObserver; | |
import io.reactivex.SingleSource; | |
import io.reactivex.android.schedulers.AndroidSchedulers; | |
import io.reactivex.annotations.NonNull; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.functions.Consumer; | |
import io.reactivex.functions.Function; | |
import io.reactivex.functions.Predicate; | |
import io.reactivex.schedulers.Schedulers; | |
/** | |
* Created by juan.coria on 08/06/17. | |
*/ | |
public class NetworkObservable<T> implements SingleSource<T> { | |
@SuppressWarnings("unchecked") | |
public static <A> NetworkObservable<A> wrapping(SingleSource<? extends A> source) { | |
return new NetworkObservable<>(source); | |
} | |
public static <A> NetworkObservable<A> fromEndpointCall(Callable<? extends SingleSource<? extends A>> callable) { | |
try { | |
final SingleSource<? extends A> source = callable.call(); | |
return wrapping(source); | |
} catch (Throwable t) { | |
return wrapping(Single.error(t)); | |
} | |
} | |
private final SingleSource<? extends T> actual; | |
private NetworkObservable(SingleSource<? extends T> actual) { | |
this.actual = actual; | |
} | |
public <A> NetworkObservable<A> map(Function<? super T, ? extends A> mapper) { | |
return wrapping(Single.wrap(actual).map(mapper)); | |
} | |
public <A> NetworkObservable<A> flatMap(Function<? super T, ? extends NetworkObservable<? extends A>> mapper) { | |
return wrapping(Single.wrap(actual).flatMap(mapper)); | |
} | |
public NetworkObservable<T> pollUntil(Predicate<T> predicate) { | |
return wrapping( | |
Observable.interval(1, TimeUnit.SECONDS) | |
.flatMapSingle(x -> Single.wrap(actual)) | |
.takeUntil(predicate) | |
.filter(predicate) | |
.firstOrError() | |
); | |
} | |
public Disposable subscribe(Consumer<T> onNext, Consumer<Throwable> onError) { | |
return Single.wrap(actual) | |
.subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(onNext, onError); | |
} | |
@Override | |
public void subscribe(@NonNull SingleObserver<? super T> observer) { | |
Single.wrap(actual) | |
.subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(observer); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment