Skip to content

Instantly share code, notes, and snippets.

@nsk-mironov
Created May 16, 2017 07:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsk-mironov/280e5c5e82c6ce1c430d40150a16fe26 to your computer and use it in GitHub Desktop.
Save nsk-mironov/280e5c5e82c6ce1c430d40150a16fe26 to your computer and use it in GitHub Desktop.
ObservableMap.kt
import io.reactivex.Observable
import io.reactivex.ObservableSource
import io.reactivex.Observer
import io.reactivex.internal.fuseable.HasUpstreamObservableSource
import io.reactivex.internal.fuseable.QueueFuseable
import io.reactivex.internal.observers.BasicFuseableObserver
abstract class BaseObservableMap<T : Any, U : Any> protected constructor(private val source: ObservableSource<T>) : Observable<U>(), HasUpstreamObservableSource<T> {
override fun source(): ObservableSource<T> {
return source
}
override fun subscribeActual(observer: Observer<in U>) {
source.subscribe(MapObserver(observer))
}
protected abstract fun onMap(value: T): U
private inner class MapObserver(actual: Observer<in U>) : BasicFuseableObserver<T, U>(actual) {
override fun onNext(value: T) {
if (done) {
return
}
if (sourceMode != QueueFuseable.NONE) {
actual.onNext(null)
return
}
val v = try {
onMap(value)
} catch (ex: Throwable) {
fail(ex)
return
}
actual.onNext(v)
}
override fun requestFusion(mode: Int): Int {
return transitiveBoundaryFusion(mode)
}
override fun poll(): U {
return onMap(qs.poll())
}
}
companion object {
inline fun <T : Any, U : Any> create(source: ObservableSource<T>, crossinline mapper: (T) -> U): Observable<U> {
return object : BaseObservableMap<T, U>(source) {
override fun onMap(value: T) = mapper(value)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment