Skip to content

Instantly share code, notes, and snippets.

// Observable.retryWhen
public final Observable<T> retryWhen(
Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler)
// Completable.retryWhen
public final Completable retryWhen(
Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
public class RetryCompletableWithDelay implements Function<Completable, Completable> {
// THIS WON'T WORK
}
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Test
fun `observable to single with first() and default value`() {
val observable = Observable.fromIterable<Int>((1..5).toList())
assertObservableToSingle(observable, { first(-1)}) {
assertComplete()
assertValue(1)
}
val emptyObservable = Observable.empty<Int>()
assertObservableToSingle(emptyObservable, { first(-1)}) {
@Throws(UndeliverableException::class)
@Test
fun `observable that throws error after emitting an item to single with firstOrError`() {
RxJavaPlugins.setErrorHandler { System.err.println("An unhandled exception was caught: $it") }
val observable = Observable.create<Int> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onError(RuntimeException("Exception while emitting Ints"))
}
@Test
fun `observable that throws error to single with firstOrError`() {
val observable = Observable.error<Int>(RuntimeException("Exception while emitting Ints"))
assertObservableToSingle(observable, { firstOrError() } ) {
assertError(java.lang.RuntimeException::class.java)
}
}
@Test
fun `empty observable to single with firstOrError`() {
val observable = Observable.empty<Int>()
assertObservableToSingle(observable, { firstOrError() } ) {
assertError(NoSuchElementException::class.java)
}
}
@Test
fun `hot observable to single with firstOrError with extracted method`() {
val observable = Observable.fromIterable<Int>((1..5).toList())
assertObservableToSingle(observable, { firstOrError() } ) {
assertComplete()
assertValue(1)
}
}
private fun assertObservableToSingle(
sourceObservable: Observable<Int>,
toSingleTransformation: Observable<Int>.() -> Single<Int>,
assertions: TestObserver<Int>.() -> Unit
) {
val connectableObservable : ConnectableObservable<Int> = sourceObservable
.doOnNext { println("Source observable emitting $it") }
.doOnError { println("Source observable emitted an error: $it") }
.publish()
Source observable emitting 1
Single succeeded with value: 1
Source observable emitting 2
Source observable emitting 3
Source observable emitting 4
Source observable emitting 5