This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun statefulIntegers() = Flowable.generate( | |
Callable<Int> { 0 }, | |
BiFunction<Int, Emitter<Int>, Int> { index, emitter -> | |
if (index < 100) { | |
emitter.onNext(index) | |
} else { | |
emitter.onComplete() | |
} | |
return@BiFunction index + 1 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data class PagedResponse<out T>(@Json(name = "isLastPage") val isLastPage: Boolean, | |
@Json(name = "values") val values: List<T>, | |
@Json(name = "nextPageStart") val nextPageStart: Int) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface ServerApi { | |
@GET("/rest/api/1.0/strings/") | |
fun getStrings( | |
@Query("start") start: Int, | |
@Query("pageLimit") pageLimit: Int | |
): Flowable<PagedResponse<String>> | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun <TData> pagedDataOf(nextPageStartingFrom: (start: Int) -> Flowable<PagedResponse<TData>>): Flowable<List<TData>> { | |
return Flowable.generate<List<TData>, Int>( | |
Callable<Int> { 0 }, | |
BiFunction<Int, Emitter<List<TData>>, Int> { index, emitter -> | |
nextPageStartingFrom(index) | |
.doOnNext { page -> | |
if (!page.values.isEmpty()) { | |
emitter.onNext(page.values) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun <TData> pagedDataOf(nextPageStartingFrom: (start: Int) -> Flowable<PagedResponse<TData>>): Flowable<List<TData>> = | |
Flowables.generateWith({ 0 }) { start, emitter -> | |
nextPageStartingFrom(start) | |
.doOnNext { | |
it.apply { | |
values.takeIf { it.isNotEmpty() }?.let { emitter.onNext(it) } | |
isLastPage.takeIf { it }?.let { emitter.onComplete() } | |
} | |
} | |
.map { it.nextPageStart } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Flowables private constructor(){ | |
companion object { | |
inline fun <T, S> generateWith(noinline initialState: () -> S, crossinline generator: (S, Emitter<T>) -> S) | |
= Flowable.generate(Callable<S>(initialState), BiFunction<S, Emitter<T>, S> { a, b -> generator.invoke(a, b) }) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun sampleCall(api: ServerApi) { | |
pagedDataOf { start -> api.getStrings(start, pageLimit = 20) } | |
.subscribeOn(Schedulers.io()) | |
.subscribe(object: Subscriber<List<String>> { | |
override fun onSubscribe(subscription: Subscription) { | |
subscription.request(2) | |
} | |
override fun onNext(nextPage: List<String>) = System.out.println(nextPage) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.luks91.streamnesting | |
import android.util.Log | |
import io.reactivex.Flowable | |
import io.reactivex.schedulers.Schedulers | |
import io.reactivex.subscribers.DefaultSubscriber | |
data class Contact(val firstName: String, | |
val lastName: String, | |
val phoneNumber: String) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun save(contacts: Flowable<Contact>) = | |
contacts | |
.observeOn(Schedulers.io()) | |
.subscribe(object : DefaultSubscriber<Contact>() { | |
override fun onStart() = request(1) | |
override fun onNext(contact: Contact) = | |
openConnection<Contact>().use { | |
it.write(contact) | |
request(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
override fun onStart() = request(1) |
OlderNewer