Skip to content

Instantly share code, notes, and snippets.

@jeremiegirault
Last active December 7, 2016 09:11
Show Gist options
  • Save jeremiegirault/358d0c50f9c98dc4c23479a220fa3f62 to your computer and use it in GitHub Desktop.
Save jeremiegirault/358d0c50f9c98dc4c23479a220fa3f62 to your computer and use it in GitHub Desktop.
Rx state consolidation
import Foundation
import RxSwift
fileprivate final class MutableReference<T> {
var value: T
init(_ value: T) { self.value = value }
}
struct SideEffect<State> {
fileprivate let observable: (MutableReference<State>, SchedulerType) -> Observable<Void>
init<Observed>(_ observable: Observable<Observed>, _ update: @escaping (inout State, Observed) -> Void) {
self.observable = { state, scheduler in
return observable
.observeOn(scheduler)
.do(onNext: { value in
update(&state.value, value)
})
.map { _ in () }
}
}
}
extension Observable {
func asSideEffect<State>(_ update: @escaping (inout State, Element) -> Void) -> SideEffect<State> {
return SideEffect(self, update)
}
static func consolidate(_ initialValue: @escaping @autoclosure () -> Element,
with sideEffects: SideEffect<Element>...) -> Observable<Element> {
return Observable.create { observer in
let scheduler = SerialDispatchQueueScheduler(qos: .default)
let state = MutableReference(initialValue())
let sideEffectsStreams = sideEffects.map { $0.observable(state, scheduler) }
return Observable<Observable<Void>>.from(sideEffectsStreams)
.merge()
.observeOn(scheduler)
.map { _ in state.value }
.subscribe(observer)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment