Skip to content

Instantly share code, notes, and snippets.

@krossovochkin
Created February 23, 2020 08:15
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krossovochkin/1a47f05d3ccbf20fffa872b22362e16d to your computer and use it in GitHub Desktop.
Save krossovochkin/1a47f05d3ccbf20fffa872b22362e16d to your computer and use it in GitHub Desktop.
From RxJava to Kotlin Flow: Error Handling
package by.krossovochkin.testflow
import io.reactivex.Observable
import io.reactivex.exceptions.CompositeException
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch
class Error {
@Test
fun testObservables() {
// next 1, error java.lang.RuntimeException
testObservable { this }
// INTERCEPTED java.lang.RuntimeException, next 1, error java.lang.RuntimeException
testObservable { doOnError { print("INTERCEPTED $it, ") } }
// next 1, next 5,
testObservable { onErrorReturn { 5 } }
// next 1, next 1, next 2, next 3,
testObservable { onErrorResumeNext(Observable.just(1, 2, 3)) }
// next 10, error java.lang.RuntimeException
testInnerObservable { concatMap { observable(it) } }
// next 10, next 11, next 12, error io.reactivex.exceptions.CompositeException: 3 exceptions occurred.
testInnerObservable { concatMapDelayError { observable(it) } }
}
@Test
fun testFlows() {
// next 1, error java.lang.RuntimeException
testFlow { this }
// INTERCEPTED java.lang.RuntimeException, next 1, error java.lang.RuntimeException
testFlow { doOnError { print("INTERCEPTED $it, ") } }
// next 1, next 5,
testFlow { catch { emit(5) } }
// next 1, next 1, next 2, next 3,
testFlow { catch { emitAll(flowOf(1, 2, 3)) } }
// next 10, error java.lang.RuntimeException
testInnerFlow { flatMapConcat { myFlow(it) } }
// next 10, next 11, next 12, error io.reactivex.exceptions.CompositeException: 3 exceptions occurred.
testInnerFlow { flatMapConcatDelayError { myFlow(it) } }
}
fun <T> Flow<T>.doOnError(onError: (Throwable) -> Unit): Flow<T> {
return flow {
try {
collect { value ->
emit(value)
}
} catch (e: Exception) {
onError(e)
throw e
}
}
}
fun <T, R> Flow<T>.flatMapConcatDelayError(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcatDelayError()
fun <T> Flow<Flow<T>>.flattenConcatDelayError(): Flow<T> {
val list = CopyOnWriteArrayList<Exception>()
return flow<T> {
collect { value ->
try {
emitAll(value)
} catch (e: Exception) {
list.add(e)
}
}
}.onCompletion {
if (list.isNotEmpty()) {
throw CompositeException(list)
}
}
}
private fun testFlow(
operator: Flow<Int>.() -> Flow<Int>
) {
val latch = CountDownLatch(1)
val result = StringBuffer()
CoroutineScope(Job() + Dispatchers.IO).launch {
try {
myFlow()
.operator()
.onCompletion { latch.countDown() }
.collect {
result.append("next $it, ")
}
} catch (e: Exception) {
result.append("error $e")
}
}
latch.await()
println(result)
}
private fun testInnerFlow(
operator: Flow<Int>.() -> Flow<Int>
) {
val latch = CountDownLatch(1)
val result = StringBuffer()
CoroutineScope(Job() + Dispatchers.IO).launch {
try {
flowOf(10, 11, 12)
.operator()
.onCompletion { latch.countDown() }
.collect {
result.append("next $it, ")
}
} catch (e: Exception) {
result.append("error $e")
}
}
latch.await()
println(result)
}
private fun testInnerObservable(
operator: Observable<Int>.() -> Observable<Int>
) {
val latch = CountDownLatch(1)
val result = StringBuffer()
Observable.just(10, 11, 12)
.subscribeOn(Schedulers.io())
.operator()
.doOnTerminate { latch.countDown() }
.subscribe(
{ result.append("next $it, ") },
{ result.append("error $it") }
)
latch.await()
println(result)
}
private fun testObservable(
operator: Observable<Int>.() -> Observable<Int>
) {
val latch = CountDownLatch(1)
val result = StringBuffer()
observable()
.subscribeOn(Schedulers.io())
.operator()
.doOnTerminate { latch.countDown() }
.subscribe(
{ result.append("next $it, ") },
{ result.append("error $it") }
)
latch.await()
println(result)
}
private fun myFlow(
value: Int = 1
): Flow<Int> {
return flow {
emit(value)
throw RuntimeException()
}
}
private fun observable(
value: Int = 1
): Observable<Int> {
return Observable.create { emitter ->
emitter.onNext(value)
emitter.onError(RuntimeException())
}
}
@Test
fun testObservableSubscribe() {
// next 1, next 2, next 3, complete
Observable.just(1, 2, 3)
.subscribe(
{ print("next $it, ") },
{ print("error $it, ") },
{ print("complete ") }
)
println()
// next 1, error java.lang.RuntimeException,
Observable
.create<Int> { emitter ->
emitter.onNext(1)
throw RuntimeException()
}
.subscribe(
{ print("next $it, ") },
{ print("error $it, ") },
{ print("complete ") }
)
println()
Observable
.create<Int> { emitter ->
emitter.onNext(1)
emitter.onComplete()
}
.subscribe(
{ print("next $it, ") },
{ print("error $it, ") },
{ throw RuntimeException() }
)
}
@Test
fun testFlowSubscribe() {
// next 1, next 2, next 3, complete
runBlocking {
flowOf(1, 2, 3)
.subscribe(
{ print("next $it, ") },
{ print("error $it, ") },
{ print("complete ") }
)
}
println()
// next 1, complete error java.lang.RuntimeException,
runBlocking {
flow {
emit(1)
throw RuntimeException()
}
.subscribe(
{ print("next $it, ") },
{ print("error $it, ") },
{ print("complete ") }
)
}
println()
// next 1, exception
runBlocking {
flow {
emit(1)
}
.subscribe(
{ print("next $it, ") },
{ print("error $it, ") },
{ throw RuntimeException() }
)
}
}
@ExperimentalCoroutinesApi
suspend fun <T> Flow<T>.subscribe(
onNext: (T) -> Unit,
onError: (Throwable) -> Unit,
onComplete: () -> Unit
) {
this
.catch {}
.onEach { value -> onNext(value) }
.onCompletion { error: Throwable? ->
if (error == null) {
onComplete()
} else {
onError(error)
}
}
.collect()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment