Skip to content

Instantly share code, notes, and snippets.

@inamiy
Last active July 27, 2017 17:06
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save inamiy/31bd4c3480f9a4a6c2e945c211c7fed0 to your computer and use it in GitHub Desktop.
Save inamiy/31bd4c3480f9a4a6c2e945c211c7fed0 to your computer and use it in GitHub Desktop.
Rx "Hot -> Cold" + multiple subscription behavior
print("=== signal (hot) ===")
do {
let (signal, observer) = Signal<Int, NoError>.pipe()
let s = signal
.map { x -> Int in
print("map \(x)")
return x
}
s.observeValues { _ in }
s.observeValues { _ in } // subscribe twice
observer.send(value: 1)
observer.send(value: 2)
}
print("=== producer(signal) (warm) ===")
do {
let (signal, observer) = Signal<Int, NoError>.pipe()
let s = SignalProducer(signal)
.map { x -> Int in
print("map \(x)")
return x
}
s.startWithValues { _ in }
s.startWithValues { _ in } // subscribe twice
observer.send(value: 1)
observer.send(value: 2)
}
//=== signal (hot) ===
//map 1
//map 2
//=== producer(signal) (warm) ===
//map 1
//map 1
//map 2
//map 2
print("=== hot -> map (cold) ===")
do {
let pub = PublishSubject<Int>()
let o = pub
.map { x -> Int in
print("map \(x)")
return x
}
o.subscribe()
o.subscribe()
pub.onNext(1)
pub.onNext(2)
}
extension ObservableConvertibleType {
// Custom `map` using `Observable.create` (which is cold)
func map2<E2>(_ f: @escaping (E) -> E2) -> Observable<E2> {
return Observable.create { observer in
return self.asObservable()
.subscribe(onNext: { x in
observer.onNext(f(x))
}, onError: { e in
observer.onError(e)
}, onCompleted: {
observer.onCompleted()
})
}
}
}
print("=== hot -> map2 (same as map, cold) ===")
do {
let pub = PublishSubject<Int>()
let o = pub
.map { x -> Int in
print("map \(x)")
return x
}
o.subscribe()
o.subscribe()
pub.onNext(1)
pub.onNext(2)
}
print("=== hot -> Driver.map (hot) ===")
do {
let pub = PublishSubject<Int>()
let o = pub
.asDriver(onErrorDriveWith: .empty())
.map { x -> Int in
print("map \(x)")
return x
}
o.drive()
o.drive()
pub.onNext(1)
pub.onNext(2)
}
//=== hot -> map (cold) ===
//map 1
//map 1
//map 2
//map 2
//=== hot -> map2 (same as map, cold) ===
//map 1
//map 1
//map 2
//map 2
//=== hot -> Driver.map (hot) ===
//map 1
//map 2
@inamiy
Copy link
Author

inamiy commented Jul 26, 2017

In ReactiveSwift, Signal.map is already optimized as a hot-to-hot transformation, so duplicated map-closure will not be invoked on multiple subscriptions.
In RxSwift, Observable.map is only implemented as hot-to-cold, so there always need a verbose re-sharing code before every streamline's diverging point, e.g. .shareReplayLatestWhileConnected().

@inamiy
Copy link
Author

inamiy commented Jul 26, 2017

It's worth noting that RxSwift's .shareReplayLatestWhileConnected needs an internal cache for replaying, whereas ReactiveSwift doesn't require one.

@inamiy
Copy link
Author

inamiy commented Jul 27, 2017

Updated RxSwift's hot -> Driver.map example.
Now it seems to work as "hot-to-hot" transformation (with a limitation of main-thread only).

But be aware that:

let o = pub
    .asDriver(onErrorDriveWith: .empty())
    .map { ... }.map { ... }.map { ... }

is essentially same as:

let o = pub
    .shareReplayLatestWhileConnected()
    .map { ... }.shareReplayLatestWhileConnected()
    .map { ... }.shareReplayLatestWhileConnected()
    .map { ... }.shareReplayLatestWhileConnected()

so that each shareReplayLatestWhileConnected() will have a cached latest element if some observer is subscribed (drived).
This is a problem of RxSwift.Driver having cached data for every single Rx operator.

@sergdort
Copy link

sergdort commented Jul 27, 2017

Hi, @inamiy
Based of my knowledge of Rx :)

I feel like you think about share.. operators from the wrong perspective. They are designed to share subscription not a sequence.
What I I'm trying to say is how Map (or any other operator) works, is that internally it subscribes to the source, and this is what you want to share :)

So if you do is :

let o = pub
    .map { ... }
    .map { ... }
    .map { ... }
    .share() //(or pick any other sharing strategy)

I believe it should work.

From my understanding of how Rx is designed is that subscription is "in charge". That's why it's called Observable sequence you can "start" observing it by subscribing to it. And share.. operators there are just an optimisation operators e.g so you don't do the same work for new subscription.

Hope it make sense :) And please correct me if was wrong at some point!

@inamiy
Copy link
Author

inamiy commented Jul 27, 2017

Hi @sergdort

You are right, and I also think it's wise to use share only at the bottommost observable pipelining just before subscription (if possible).

But as far as I see Driver operator impls e.g. Driver.map, I don't think its concept is limited to such use case only.
I found it a problem since users can easily get trapped by excessive caching (even with the help of .whileConnected).

I personally think Driver operators should not be used for memory efficiency, and only use .drive() for UI data-binding.

(But maybe I'm too nervous about memory usage!)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment