Skip to content

Instantly share code, notes, and snippets.

@kakajika
Last active September 24, 2018 05:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kakajika/ed7a022556d0e79864acbc2b6ccb51ca to your computer and use it in GitHub Desktop.
Save kakajika/ed7a022556d0e79864acbc2b6ccb51ca to your computer and use it in GitHub Desktop.
A port of RxJS's expand to RxKotlin.
import io.reactivex.Flowable
import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.rxkotlin.Flowables
import io.reactivex.rxkotlin.Maybes
import io.reactivex.rxkotlin.Observables
import io.reactivex.rxkotlin.Singles
fun <T> Observables.expand(source: Observable<T>, expander: (T) -> Observable<T>): Observable<T> {
return Observable.defer {
source.mergeWith(source.flatMap {
expand(expander(it), expander)
})
}
}
fun <T> Flowables.expand(source: Flowable<T>, expander: (T) -> Flowable<T>): Flowable<T> {
return Flowable.defer {
source.mergeWith(source.flatMap {
expand(expander(it), expander)
})
}
}
fun <T> Singles.expand(source: Single<T>, expander: (T) -> Single<T>): Flowable<T> {
return Flowables.expand(source.toFlowable()) {
expander(it).toFlowable()
}
}
fun <T> Maybes.expand(source: Maybe<T>, expander: (T) -> Maybe<T>): Flowable<T> {
return Flowables.expand(source.toFlowable()) {
expander(it).toFlowable()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment