Last active
October 24, 2018 15:42
-
-
Save sergiocasero/8fe81c38a7c86d2829e72d0831917890 to your computer and use it in GitHub Desktop.
Maybe extension function to save data and return the same value (wihout blocking the stream)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun <T> Maybe<T>.save(saveFunction: (T) -> Unit): Maybe<T> { | |
return MaybeSave(this, saveFunction) | |
} | |
private class MaybeSave<T>(val source: MaybeSource<T>, val saveFunction: (T) -> Unit) : Maybe<T>() { | |
override fun subscribeActual(observer: MaybeObserver<in T>) { | |
source.subscribe(MaybeSaveObserver<T>(observer, saveFunction)) | |
} | |
internal class MaybeSaveObserver<T>(val downstream: MaybeObserver<in T>, val saveFunction: (T) -> Unit) : MaybeObserver<T>, Disposable { | |
private var upstream: Disposable? = null | |
override fun dispose() { | |
val d = this.upstream | |
this.upstream = DisposableHelper.DISPOSED | |
d?.dispose() | |
} | |
override fun isDisposed(): Boolean = upstream?.isDisposed ?: DEFAULT_BOOLEAN | |
override fun onSubscribe(d: Disposable) { | |
if (DisposableHelper.validate(this.upstream, d)) { | |
this.upstream = d | |
downstream.onSubscribe(this) | |
} | |
} | |
override fun onSuccess(value: T) { | |
saveFunction(value) | |
downstream.onSuccess(value) | |
} | |
override fun onError(e: Throwable) = downstream.onError(e) | |
override fun onComplete() = downstream.onComplete() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment