Created
February 23, 2020 08:15
-
-
Save krossovochkin/1a47f05d3ccbf20fffa872b22362e16d to your computer and use it in GitHub Desktop.
From RxJava to Kotlin Flow: Error Handling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package by.krossovochkin.testflow | |
import io.reactivex.Observable | |
import io.reactivex.exceptions.CompositeException | |
import io.reactivex.schedulers.Schedulers | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.* | |
import org.junit.Test | |
import java.util.concurrent.CopyOnWriteArrayList | |
import java.util.concurrent.CountDownLatch | |
class Error { | |
@Test | |
fun testObservables() { | |
// next 1, error java.lang.RuntimeException | |
testObservable { this } | |
// INTERCEPTED java.lang.RuntimeException, next 1, error java.lang.RuntimeException | |
testObservable { doOnError { print("INTERCEPTED $it, ") } } | |
// next 1, next 5, | |
testObservable { onErrorReturn { 5 } } | |
// next 1, next 1, next 2, next 3, | |
testObservable { onErrorResumeNext(Observable.just(1, 2, 3)) } | |
// next 10, error java.lang.RuntimeException | |
testInnerObservable { concatMap { observable(it) } } | |
// next 10, next 11, next 12, error io.reactivex.exceptions.CompositeException: 3 exceptions occurred. | |
testInnerObservable { concatMapDelayError { observable(it) } } | |
} | |
@Test | |
fun testFlows() { | |
// next 1, error java.lang.RuntimeException | |
testFlow { this } | |
// INTERCEPTED java.lang.RuntimeException, next 1, error java.lang.RuntimeException | |
testFlow { doOnError { print("INTERCEPTED $it, ") } } | |
// next 1, next 5, | |
testFlow { catch { emit(5) } } | |
// next 1, next 1, next 2, next 3, | |
testFlow { catch { emitAll(flowOf(1, 2, 3)) } } | |
// next 10, error java.lang.RuntimeException | |
testInnerFlow { flatMapConcat { myFlow(it) } } | |
// next 10, next 11, next 12, error io.reactivex.exceptions.CompositeException: 3 exceptions occurred. | |
testInnerFlow { flatMapConcatDelayError { myFlow(it) } } | |
} | |
fun <T> Flow<T>.doOnError(onError: (Throwable) -> Unit): Flow<T> { | |
return flow { | |
try { | |
collect { value -> | |
emit(value) | |
} | |
} catch (e: Exception) { | |
onError(e) | |
throw e | |
} | |
} | |
} | |
fun <T, R> Flow<T>.flatMapConcatDelayError(transform: suspend (value: T) -> Flow<R>): Flow<R> = | |
map(transform).flattenConcatDelayError() | |
fun <T> Flow<Flow<T>>.flattenConcatDelayError(): Flow<T> { | |
val list = CopyOnWriteArrayList<Exception>() | |
return flow<T> { | |
collect { value -> | |
try { | |
emitAll(value) | |
} catch (e: Exception) { | |
list.add(e) | |
} | |
} | |
}.onCompletion { | |
if (list.isNotEmpty()) { | |
throw CompositeException(list) | |
} | |
} | |
} | |
private fun testFlow( | |
operator: Flow<Int>.() -> Flow<Int> | |
) { | |
val latch = CountDownLatch(1) | |
val result = StringBuffer() | |
CoroutineScope(Job() + Dispatchers.IO).launch { | |
try { | |
myFlow() | |
.operator() | |
.onCompletion { latch.countDown() } | |
.collect { | |
result.append("next $it, ") | |
} | |
} catch (e: Exception) { | |
result.append("error $e") | |
} | |
} | |
latch.await() | |
println(result) | |
} | |
private fun testInnerFlow( | |
operator: Flow<Int>.() -> Flow<Int> | |
) { | |
val latch = CountDownLatch(1) | |
val result = StringBuffer() | |
CoroutineScope(Job() + Dispatchers.IO).launch { | |
try { | |
flowOf(10, 11, 12) | |
.operator() | |
.onCompletion { latch.countDown() } | |
.collect { | |
result.append("next $it, ") | |
} | |
} catch (e: Exception) { | |
result.append("error $e") | |
} | |
} | |
latch.await() | |
println(result) | |
} | |
private fun testInnerObservable( | |
operator: Observable<Int>.() -> Observable<Int> | |
) { | |
val latch = CountDownLatch(1) | |
val result = StringBuffer() | |
Observable.just(10, 11, 12) | |
.subscribeOn(Schedulers.io()) | |
.operator() | |
.doOnTerminate { latch.countDown() } | |
.subscribe( | |
{ result.append("next $it, ") }, | |
{ result.append("error $it") } | |
) | |
latch.await() | |
println(result) | |
} | |
private fun testObservable( | |
operator: Observable<Int>.() -> Observable<Int> | |
) { | |
val latch = CountDownLatch(1) | |
val result = StringBuffer() | |
observable() | |
.subscribeOn(Schedulers.io()) | |
.operator() | |
.doOnTerminate { latch.countDown() } | |
.subscribe( | |
{ result.append("next $it, ") }, | |
{ result.append("error $it") } | |
) | |
latch.await() | |
println(result) | |
} | |
private fun myFlow( | |
value: Int = 1 | |
): Flow<Int> { | |
return flow { | |
emit(value) | |
throw RuntimeException() | |
} | |
} | |
private fun observable( | |
value: Int = 1 | |
): Observable<Int> { | |
return Observable.create { emitter -> | |
emitter.onNext(value) | |
emitter.onError(RuntimeException()) | |
} | |
} | |
@Test | |
fun testObservableSubscribe() { | |
// next 1, next 2, next 3, complete | |
Observable.just(1, 2, 3) | |
.subscribe( | |
{ print("next $it, ") }, | |
{ print("error $it, ") }, | |
{ print("complete ") } | |
) | |
println() | |
// next 1, error java.lang.RuntimeException, | |
Observable | |
.create<Int> { emitter -> | |
emitter.onNext(1) | |
throw RuntimeException() | |
} | |
.subscribe( | |
{ print("next $it, ") }, | |
{ print("error $it, ") }, | |
{ print("complete ") } | |
) | |
println() | |
Observable | |
.create<Int> { emitter -> | |
emitter.onNext(1) | |
emitter.onComplete() | |
} | |
.subscribe( | |
{ print("next $it, ") }, | |
{ print("error $it, ") }, | |
{ throw RuntimeException() } | |
) | |
} | |
@Test | |
fun testFlowSubscribe() { | |
// next 1, next 2, next 3, complete | |
runBlocking { | |
flowOf(1, 2, 3) | |
.subscribe( | |
{ print("next $it, ") }, | |
{ print("error $it, ") }, | |
{ print("complete ") } | |
) | |
} | |
println() | |
// next 1, complete error java.lang.RuntimeException, | |
runBlocking { | |
flow { | |
emit(1) | |
throw RuntimeException() | |
} | |
.subscribe( | |
{ print("next $it, ") }, | |
{ print("error $it, ") }, | |
{ print("complete ") } | |
) | |
} | |
println() | |
// next 1, exception | |
runBlocking { | |
flow { | |
emit(1) | |
} | |
.subscribe( | |
{ print("next $it, ") }, | |
{ print("error $it, ") }, | |
{ throw RuntimeException() } | |
) | |
} | |
} | |
@ExperimentalCoroutinesApi | |
suspend fun <T> Flow<T>.subscribe( | |
onNext: (T) -> Unit, | |
onError: (Throwable) -> Unit, | |
onComplete: () -> Unit | |
) { | |
this | |
.catch {} | |
.onEach { value -> onNext(value) } | |
.onCompletion { error: Throwable? -> | |
if (error == null) { | |
onComplete() | |
} else { | |
onError(error) | |
} | |
} | |
.collect() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment