Last active
September 18, 2019 00:21
-
-
Save joshuajhomann/903c37a603d841f19bc062c5a9fcb26f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import RxSwift | |
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true | |
enum FeedElement { | |
case array([String]) | |
case delta(Int, String) | |
} | |
let arraySource = Observable<[String]>.create { observer in | |
let sequences: [[String]] = [["a","b","c"], ["a","c"], ["b","c"]] | |
var index = 0; | |
let timer = Timer.scheduledTimer(withTimeInterval: 3.5, repeats: true) { timer in | |
observer.onNext(sequences[index]) | |
index += 1 | |
index %= sequences.count | |
} | |
return Disposables.create { | |
timer.invalidate() | |
} | |
} | |
let deltaSource = Observable<(Int, String)>.create { observer in | |
let pairs: [(Int, String)] = [(0,"x"), (1,"y"), (2, "z")] | |
var index = 0; | |
let timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { timer in | |
observer.onNext(pairs[index]) | |
index += 1 | |
index %= pairs.count | |
} | |
return Disposables.create { | |
timer.invalidate() | |
} | |
} | |
let merged = Observable.merge( | |
arraySource.map (FeedElement.array), | |
deltaSource.map { FeedElement.delta($0.0, $0.1)} | |
) | |
.scan(into: [String]() ) { lastArray, feedElement in | |
switch feedElement { | |
case .array(let array): | |
lastArray = array | |
case .delta(let index, let itemToAppend): | |
guard lastArray.indices ~= index else { | |
return | |
} | |
lastArray[index] = lastArray[index] + itemToAppend | |
} | |
} | |
.distinctUntilChanged() | |
let disposable = merged | |
.subscribe(onNext: { array in | |
print(array) | |
}) | |
Hm so just tried using takeUntil
, and experienced what was described in this closed issue: ReactiveX/RxSwift#1316
I get a warning that the result of subscribe
is unused
tableView.rx.contentOffset
.takeUntil(self.rx.deallocated)
.subscribe(onNext: { [weak self] offset in
})
so in the interim I'm just switching back to disposed(by:)
chained after the subscribe
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ah never seen
take
before, I also like that better, it's more explicit about when the subscription is being released