Last active
March 23, 2018 01:18
-
-
Save fullkomnun/738e86c297842feca099536068aa2706 to your computer and use it in GitHub Desktop.
Implementations of flatMap that runs repeatedly as a feedback-loop when number of iterations is unknown ahead of time(e.g. pagination). First implementation is "recursive"(through deferred execution) while second one is not. The sample code tries to fetch at least 100 integers starting from 1 while each request returns the next n integers when n…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Observable | |
import io.reactivex.Single | |
import io.reactivex.schedulers.Schedulers | |
import java.util.* | |
fun <T> flatMapUntil_deferred_rec(seedState: T, flat: (T) -> Single<T>, until: (T) -> Boolean): Observable<T> { | |
fun flatMapUntil_rec(state: T): Observable<T> = | |
if (until(state)) Observable.empty() | |
else flat(state).flatMapObservable { next -> Observable.just(next).concatWith(flatMapUntil_deferred_rec(next)) } | |
return flatMapUntil_rec(seedState).replay().refCount() | |
} | |
fun <T> flatMapUntil_non_rec(seedState: T, flat: (T) -> Single<T>, until: (T) -> Boolean): Observable<T> { | |
var state = seedState | |
return Single.defer { flat(state) } | |
.doOnSuccess { state = it } | |
.repeatWhen { it.takeUntil { until(state) } } | |
.toObservable().replay().refCount() | |
} | |
private val rnd = Random(UUID.randomUUID().leastSignificantBits) | |
fun generateNumbers(cursor: Cursor): Single<Cursor> { | |
val nums = cursor.next..(cursor.next + rnd.nextInt(15)) | |
val next = nums.endInclusive + 1 | |
val result = Cursor(next, nums.toList()) | |
println("$cursor -> $result") | |
return Single.just(result) | |
} | |
data class Cursor(val next: Int, val nums: List<Int>) | |
fun main(args: Array<String>) { | |
flatMapUntil_deferred_rec(Cursor(1, listOf()), ::generateNumbers, { it.next > 100 }) | |
.subscribeOn(Schedulers.computation()) | |
.doOnNext { println("${Thread.currentThread().name} $it") } | |
.toList() | |
.map { it.flatMap { it.nums } } | |
.subscribe { nums -> println(nums) } | |
readLine() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment