Skip to content

Instantly share code, notes, and snippets.

@sockeqwe
Created April 28, 2019 11:33
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sockeqwe/46ae8c2a6c022ba692483423406c22cd to your computer and use it in GitHub Desktop.
Save sockeqwe/46ae8c2a6c022ba692483423406c22cd to your computer and use it in GitHub Desktop.
switchMap operator for Kotlin's Flow type.
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
/**
* switchMap operator. Like RxJava's switchMap operator it cancels any ongoing flow and starts the new one on every new
* value that gets emitted from upstream.
*/
@FlowPreview
public fun <T, R> Flow<T>.switchMap(
mapper: suspend (value: T) -> Flow<R>
): Flow<R> {
return flow {
coroutineScope {
var currentJob: Job? = null
collect { outerValue ->
val inner = mapper(outerValue)
currentJob?.cancelAndJoin()
currentJob = launch {
inner.collect { value ->
emit(value)
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment