Skip to content

Instantly share code, notes, and snippets.

@f3401pal
Last active December 20, 2018 22:29
Show Gist options
  • Save f3401pal/79f8351d1bc1422bf25eedc76a4900df to your computer and use it in GitHub Desktop.
Save f3401pal/79f8351d1bc1422bf25eedc76a4900df to your computer and use it in GitHub Desktop.
RxKotlin Flowable from staged completables (draft)
import io.reactivex.Completable
import io.reactivex.Flowable
object FlowableEx {
/**
* Create a Flowable by chaining multiple completables and execute them in order.
* Note that the completables will be run in sequense. If previous completable fails, the next one will NOT run.
* For example:
* FlowableEx.fromCompletables(developing, review, testing) { index ->
* when (inde) {
* 0 -> "done developing, PR created"
* 1 -> "PR review done and merged"
* 2 -> "testing done, released"
* }
* }
* When each completable completes, emit the result returned from stateProvider given the index of the completables passed in.
*/
fun <R> fromCompletables(vararg completables: Completable, stateProvider: (Int) -> R): Flowable<R> {
class StageCompletable<R>(val completable: Completable, val stageResult: R)
val stages = completables.mapIndexed { index, completable ->
StageCompletable(completable, stateProvider(index))
}
return Flowable.fromIterable(stages).map { stageCompletable ->
// this will block the current thread
stageCompletable.completable.blockingGet()?.let {
throw it
} ?: stageCompletable.stageResult
}
}
}
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.never
import com.nhaarman.mockito_kotlin.verify
import io.reactivex.Completable
import io.reactivex.schedulers.Schedulers
import io.reactivex.schedulers.TestScheduler
import org.junit.Test
import java.util.concurrent.TimeUnit
class RxExtensionTest {
private val testScheduler: TestScheduler = TestScheduler()
@Test
fun `fromCompletables@Flowable chains the complatables and return a result in each stage`() {
FlowableEx.fromCompletables(
Completable.complete(),
Completable.complete(),
Completable.complete()
) { it }.test()
.assertValues(0, 1, 2)
.assertComplete()
}
@Test
fun `fromCompletables@Flowable breaks out of the stream when one completable failed`() {
val error = RuntimeException()
FlowableEx.fromCompletables(
Completable.complete(),
Completable.error(error),
Completable.complete()
) { it }.test()
.assertValues(0)
.assertError(error)
}
@Test
fun `fromCompletables@Flowable executes the completables in order`() {
val scheduler = Schedulers.single()
FlowableEx.fromCompletables(
Completable.timer(5, TimeUnit.SECONDS, scheduler),
Completable.timer(2, TimeUnit.SECONDS, scheduler),
Completable.complete()
) { it }.test()
.assertValues(0, 1, 2)
.assertComplete()
}
@Test
fun `fromCompletables@Flowable stops execution when disposed`() {
var subscriber: TestSubscriber<Int>? = null
val dispose = fun () { subscriber?.dispose() }
subscriber = FlowableEx.fromCompletables(
Completable.fromCallable { dispose },
Completable.fromCallable { fail() }
) { it }.test()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment