Skip to content

Instantly share code, notes, and snippets.

@luks91
Last active November 23, 2017 14:15
Show Gist options
  • Save luks91/a9a69f8f10ea8d4c03a0d9702a2ce59d to your computer and use it in GitHub Desktop.
Save luks91/a9a69f8f10ea8d4c03a0d9702a2ce59d to your computer and use it in GitHub Desktop.
package com.github.luks91.streamnesting
import android.util.Log
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
import io.reactivex.subscribers.DefaultSubscriber
data class Contact(val firstName: String,
val lastName: String,
val phoneNumber: String)
class Repository {
fun save(contacts: Flowable<Contact>) =
contacts
.observeOn(Schedulers.io())
.subscribe(object : DefaultSubscriber<Contact>() {
override fun onStart() = request(1)
override fun onNext(contact: Contact) =
openConnection<Contact>().use {
it.write(contact)
request(1)
}
override fun onComplete() {}
override fun onError(t: Throwable) {
Log.e("TAG", "Oops!", t)
}
})
fun <T> openConnection(): Connection<T> {
Log.i("TAG", "Opening database...")
Thread.sleep(2000)
return Connection()
}
class Connection<in T>: AutoCloseable {
fun write(data: T) {
Thread.sleep(10)
Log.i("TAG", "Saving: $data")
}
override fun close() {
Log.i("TAG", "Closing database...")
Thread.sleep(2000)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment