Skip to content

Instantly share code, notes, and snippets.

@sergiocasero
Last active October 24, 2018 15:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sergiocasero/8fe81c38a7c86d2829e72d0831917890 to your computer and use it in GitHub Desktop.
Save sergiocasero/8fe81c38a7c86d2829e72d0831917890 to your computer and use it in GitHub Desktop.
Maybe extension function to save data and return the same value (wihout blocking the stream)
fun <T> Maybe<T>.save(saveFunction: (T) -> Unit): Maybe<T> {
return MaybeSave(this, saveFunction)
}
private class MaybeSave<T>(val source: MaybeSource<T>, val saveFunction: (T) -> Unit) : Maybe<T>() {
override fun subscribeActual(observer: MaybeObserver<in T>) {
source.subscribe(MaybeSaveObserver<T>(observer, saveFunction))
}
internal class MaybeSaveObserver<T>(val downstream: MaybeObserver<in T>, val saveFunction: (T) -> Unit) : MaybeObserver<T>, Disposable {
private var upstream: Disposable? = null
override fun dispose() {
val d = this.upstream
this.upstream = DisposableHelper.DISPOSED
d?.dispose()
}
override fun isDisposed(): Boolean = upstream?.isDisposed ?: DEFAULT_BOOLEAN
override fun onSubscribe(d: Disposable) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d
downstream.onSubscribe(this)
}
}
override fun onSuccess(value: T) {
saveFunction(value)
downstream.onSuccess(value)
}
override fun onError(e: Throwable) = downstream.onError(e)
override fun onComplete() = downstream.onComplete()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment