Skip to content

Instantly share code, notes, and snippets.

fun statefulIntegers() = Flowable.generate(
Callable<Int> { 0 },
BiFunction<Int, Emitter<Int>, Int> { index, emitter ->
if (index < 100) {
emitter.onNext(index)
} else {
emitter.onComplete()
}
return@BiFunction index + 1
}
data class PagedResponse<out T>(@Json(name = "isLastPage") val isLastPage: Boolean,
@Json(name = "values") val values: List<T>,
@Json(name = "nextPageStart") val nextPageStart: Int)
interface ServerApi {
@GET("/rest/api/1.0/strings/")
fun getStrings(
@Query("start") start: Int,
@Query("pageLimit") pageLimit: Int
): Flowable<PagedResponse<String>>
}
fun <TData> pagedDataOf(nextPageStartingFrom: (start: Int) -> Flowable<PagedResponse<TData>>): Flowable<List<TData>> {
return Flowable.generate<List<TData>, Int>(
Callable<Int> { 0 },
BiFunction<Int, Emitter<List<TData>>, Int> { index, emitter ->
nextPageStartingFrom(index)
.doOnNext { page ->
if (!page.values.isEmpty()) {
emitter.onNext(page.values)
}
fun <TData> pagedDataOf(nextPageStartingFrom: (start: Int) -> Flowable<PagedResponse<TData>>): Flowable<List<TData>> =
Flowables.generateWith({ 0 }) { start, emitter ->
nextPageStartingFrom(start)
.doOnNext {
it.apply {
values.takeIf { it.isNotEmpty() }?.let { emitter.onNext(it) }
isLastPage.takeIf { it }?.let { emitter.onComplete() }
}
}
.map { it.nextPageStart }
class Flowables private constructor(){
companion object {
inline fun <T, S> generateWith(noinline initialState: () -> S, crossinline generator: (S, Emitter<T>) -> S)
= Flowable.generate(Callable<S>(initialState), BiFunction<S, Emitter<T>, S> { a, b -> generator.invoke(a, b) })
}
}
fun sampleCall(api: ServerApi) {
pagedDataOf { start -> api.getStrings(start, pageLimit = 20) }
.subscribeOn(Schedulers.io())
.subscribe(object: Subscriber<List<String>> {
override fun onSubscribe(subscription: Subscription) {
subscription.request(2)
}
override fun onNext(nextPage: List<String>) = System.out.println(nextPage)
@luks91
luks91 / Example1.kt
Last active November 23, 2017 14:15
package com.github.luks91.streamnesting
import android.util.Log
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
import io.reactivex.subscribers.DefaultSubscriber
data class Contact(val firstName: String,
val lastName: String,
val phoneNumber: String)
fun save(contacts: Flowable<Contact>) =
contacts
.observeOn(Schedulers.io())
.subscribe(object : DefaultSubscriber<Contact>() {
override fun onStart() = request(1)
override fun onNext(contact: Contact) =
openConnection<Contact>().use {
it.write(contact)
request(1)
override fun onStart() = request(1)