Skip to content

Instantly share code, notes, and snippets.

@charlag
Created September 10, 2017 10:14
Show Gist options
  • Save charlag/9f7b79f8cd15fd29bd1b62174e6cf617 to your computer and use it in GitHub Desktop.
Save charlag/9f7b79f8cd15fd29bd1b62174e6cf617 to your computer and use it in GitHub Desktop.
fun <S, E> createKnot(
initial: S,
eventsSource: Observable<E>,
reducer: (S, E) -> S,
transformer: ObservableTransformer<in Triple<S, S, E>, out E>
): Observable<E> {
val state = behaviorSubjectDefault(initial)
val events = publishSubject<E>()
eventsSource.subscribe(events::onNext)
events.withLatestFrom(state, BiFunction<E, S, Triple<S, S, E>> { ev, oldState ->
val newState = reducer(oldState, ev)
state.onNext(newState)
Triple(oldState, newState, ev)
})
.compose(transformer)
.doOnError { RxJavaPlugins.onError(it) }
.subscribe(events)
return events
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment