Last active
September 18, 2019 00:21
-
-
Save joshuajhomann/903c37a603d841f19bc062c5a9fcb26f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import RxSwift | |
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true | |
enum FeedElement { | |
case array([String]) | |
case delta(Int, String) | |
} | |
let arraySource = Observable<[String]>.create { observer in | |
let sequences: [[String]] = [["a","b","c"], ["a","c"], ["b","c"]] | |
var index = 0; | |
let timer = Timer.scheduledTimer(withTimeInterval: 3.5, repeats: true) { timer in | |
observer.onNext(sequences[index]) | |
index += 1 | |
index %= sequences.count | |
} | |
return Disposables.create { | |
timer.invalidate() | |
} | |
} | |
let deltaSource = Observable<(Int, String)>.create { observer in | |
let pairs: [(Int, String)] = [(0,"x"), (1,"y"), (2, "z")] | |
var index = 0; | |
let timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { timer in | |
observer.onNext(pairs[index]) | |
index += 1 | |
index %= pairs.count | |
} | |
return Disposables.create { | |
timer.invalidate() | |
} | |
} | |
let merged = Observable.merge( | |
arraySource.map (FeedElement.array), | |
deltaSource.map { FeedElement.delta($0.0, $0.1)} | |
) | |
.scan(into: [String]() ) { lastArray, feedElement in | |
switch feedElement { | |
case .array(let array): | |
lastArray = array | |
case .delta(let index, let itemToAppend): | |
guard lastArray.indices ~= index else { | |
return | |
} | |
lastArray[index] = lastArray[index] + itemToAppend | |
} | |
} | |
.distinctUntilChanged() | |
let disposable = merged | |
.subscribe(onNext: { array in | |
print(array) | |
}) | |
ah never seen take
before, I also like that better, it's more explicit about when the subscription is being released
Hm so just tried using takeUntil
, and experienced what was described in this closed issue: ReactiveX/RxSwift#1316
I get a warning that the result of subscribe
is unused
tableView.rx.contentOffset
.takeUntil(self.rx.deallocated)
.subscribe(onNext: { [weak self] offset in
})
so in the interim I'm just switching back to disposed(by:)
chained after the subscribe
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You need to handle your disposables one way or another; I prefer the Rx clone
takeUntil(rx.deallocated)
of the ReactiveSwift versiontake(during: reactive.lifetime)
over the olderdisposeBag
, but either one works.