Skip to content

Instantly share code, notes, and snippets.

@kgmyshin
Last active January 6, 2018 05:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kgmyshin/c8d075b3b3858782b45c472622707936 to your computer and use it in GitHub Desktop.
Save kgmyshin/c8d075b3b3858782b45c472622707936 to your computer and use it in GitHub Desktop.
toSingleOptional
fun <T> Maybe<T>.toSingleOptional(): Single<Optional<T>> = RxJavaPlugins.onAssembly(MaybeToSingleOptional<T>(this))
import com.annimon.stream.Optional
import io.reactivex.MaybeObserver
import io.reactivex.MaybeSource
import io.reactivex.Single
import io.reactivex.SingleObserver
import io.reactivex.disposables.Disposable
import io.reactivex.internal.disposables.DisposableHelper
import io.reactivex.internal.fuseable.HasUpstreamMaybeSource
class MaybeToSingleOptional<T>(private val source: MaybeSource<T>) : Single<Optional<T>>(), HasUpstreamMaybeSource<T> {
override fun source(): MaybeSource<T> {
return source
}
override fun subscribeActual(observer: SingleObserver<in Optional<T>>) {
source.subscribe(ToSingleOptionalMaybeSubscriber(observer))
}
internal class ToSingleOptionalMaybeSubscriber<T>(private val actual: SingleObserver<in Optional<T>>) : MaybeObserver<T>, Disposable {
var d: Disposable? = null
override fun dispose() {
d?.dispose()
d = DisposableHelper.DISPOSED
}
override fun isDisposed(): Boolean {
return d?.isDisposed ?: false
}
override fun onSubscribe(d: Disposable) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d
actual.onSubscribe(this)
}
}
override fun onSuccess(value: T) {
d = DisposableHelper.DISPOSED
actual.onSuccess(Optional.of(value))
}
override fun onError(e: Throwable) {
d = DisposableHelper.DISPOSED
actual.onError(e)
}
override fun onComplete() {
d = DisposableHelper.DISPOSED
actual.onSuccess(Optional.empty())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment