Last active
September 18, 2019 00:21
-
-
Save joshuajhomann/903c37a603d841f19bc062c5a9fcb26f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import RxSwift | |
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true | |
enum FeedElement { | |
case array([String]) | |
case delta(Int, String) | |
} | |
let arraySource = Observable<[String]>.create { observer in | |
let sequences: [[String]] = [["a","b","c"], ["a","c"], ["b","c"]] | |
var index = 0; | |
let timer = Timer.scheduledTimer(withTimeInterval: 3.5, repeats: true) { timer in | |
observer.onNext(sequences[index]) | |
index += 1 | |
index %= sequences.count | |
} | |
return Disposables.create { | |
timer.invalidate() | |
} | |
} | |
let deltaSource = Observable<(Int, String)>.create { observer in | |
let pairs: [(Int, String)] = [(0,"x"), (1,"y"), (2, "z")] | |
var index = 0; | |
let timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { timer in | |
observer.onNext(pairs[index]) | |
index += 1 | |
index %= pairs.count | |
} | |
return Disposables.create { | |
timer.invalidate() | |
} | |
} | |
let merged = Observable.merge( | |
arraySource.map (FeedElement.array), | |
deltaSource.map { FeedElement.delta($0.0, $0.1)} | |
) | |
.scan(into: [String]() ) { lastArray, feedElement in | |
switch feedElement { | |
case .array(let array): | |
lastArray = array | |
case .delta(let index, let itemToAppend): | |
guard lastArray.indices ~= index else { | |
return | |
} | |
lastArray[index] = lastArray[index] + itemToAppend | |
} | |
} | |
.distinctUntilChanged() | |
let disposable = merged | |
.subscribe(onNext: { array in | |
print(array) | |
}) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hm so just tried using
takeUntil
, and experienced what was described in this closed issue: ReactiveX/RxSwift#1316I get a warning that the result of
subscribe
is unusedso in the interim I'm just switching back to
disposed(by:)
chained after thesubscribe