Skip to content

Instantly share code, notes, and snippets.

@charlag
Created September 10, 2017 10:14
Show Gist options
  • Save charlag/09cf31933275cae2ff6056d6989b3c2c to your computer and use it in GitHub Desktop.
Save charlag/09cf31933275cae2ff6056d6989b3c2c to your computer and use it in GitHub Desktop.
fun <S, E> createConnectableKnot(
initial: S,
eventsSource: Observable<E>,
reducer: (S, E) -> S,
transformer: ObservableTransformer<in Triple<S, S, E>, out E>
): ConnectableObservable<E> {
return Observable.create<E> { observer ->
val state = behaviorSubjectDefault(initial)
val events = publishSubject<E>()
events.withLatestFrom(state, BiFunction<E, S, Triple<S, S, E>> { ev, oldState ->
val newState = reducer(oldState, ev)
state.onNext(newState)
Triple(oldState, newState, ev)
})
.compose(transformer)
.doOnError { RxJavaPlugins.onError(it) }
.subscribe { t ->
observer.onNext(t)
events.onNext(t)
}
eventsSource.subscribe(events)
}
.publish()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment