Skip to content

Instantly share code, notes, and snippets.

@joshuajhomann
Last active September 18, 2019 00:21
Show Gist options
  • Save joshuajhomann/903c37a603d841f19bc062c5a9fcb26f to your computer and use it in GitHub Desktop.
Save joshuajhomann/903c37a603d841f19bc062c5a9fcb26f to your computer and use it in GitHub Desktop.
import RxSwift
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
enum FeedElement {
case array([String])
case delta(Int, String)
}
let arraySource = Observable<[String]>.create { observer in
let sequences: [[String]] = [["a","b","c"], ["a","c"], ["b","c"]]
var index = 0;
let timer = Timer.scheduledTimer(withTimeInterval: 3.5, repeats: true) { timer in
observer.onNext(sequences[index])
index += 1
index %= sequences.count
}
return Disposables.create {
timer.invalidate()
}
}
let deltaSource = Observable<(Int, String)>.create { observer in
let pairs: [(Int, String)] = [(0,"x"), (1,"y"), (2, "z")]
var index = 0;
let timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { timer in
observer.onNext(pairs[index])
index += 1
index %= pairs.count
}
return Disposables.create {
timer.invalidate()
}
}
let merged = Observable.merge(
arraySource.map (FeedElement.array),
deltaSource.map { FeedElement.delta($0.0, $0.1)}
)
.scan(into: [String]() ) { lastArray, feedElement in
switch feedElement {
case .array(let array):
lastArray = array
case .delta(let index, let itemToAppend):
guard lastArray.indices ~= index else {
return
}
lastArray[index] = lastArray[index] + itemToAppend
}
}
.distinctUntilChanged()
let disposable = merged
.subscribe(onNext: { array in
print(array)
})
@joshuajhomann
Copy link
Author

You need to handle your disposables one way or another; I prefer the Rx clone takeUntil(rx.deallocated) of the ReactiveSwift version take(during: reactive.lifetime) over the older disposeBag, but either one works.

@MattyAyOh
Copy link

ah never seen take before, I also like that better, it's more explicit about when the subscription is being released

@MattyAyOh
Copy link

Hm so just tried using takeUntil, and experienced what was described in this closed issue: ReactiveX/RxSwift#1316

I get a warning that the result of subscribe is unused

    tableView.rx.contentOffset
    .takeUntil(self.rx.deallocated)
    .subscribe(onNext: { [weak self] offset in

    })

so in the interim I'm just switching back to disposed(by:) chained after the subscribe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment