Skip to content

Instantly share code, notes, and snippets.

@leandropjp
Forked from pgherveou/RxSyncSequence.swift
Created October 7, 2019 17:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leandropjp/b77495a5397f7a1158d73ed04aa79caa to your computer and use it in GitHub Desktop.
Save leandropjp/b77495a5397f7a1158d73ed04aa79caa to your computer and use it in GitHub Desktop.
import PlaygroundSupport
import RxSwift
PlaygroundPage.current.needsIndefiniteExecution = true
extension ObservableType where E: Sequence {
typealias T = E.Iterator.Element
/// Create an observable which is an Array of the projected values
/// the operator produce an array of the same size than the original sequence
/// and will generate new array every time a new item is emitted
func sync<V>(project: @escaping (T) -> Observable<V> ) -> Observable<[V]> {
typealias Pair = (value: V, index: Int)
return self
.flatMapLatest { (seq) -> Observable<[V]> in
// get number of element in sequence
let count = Array(seq).count
var buffer = Dictionary<Int, V>(minimumCapacity: count)
return Observable
// convert sequence to observable
.from(seq)
// flatMap into Pair
.flatMapWithIndex { (item: T, index: Int) -> Observable<Pair> in
return Observable.combineLatest(project(item), Observable.just(index)) { (value: $0, index:$1) }
}
// reduce into a dictionnary using the index as a key
// Scan might be more approriate here but it would create a new [Int:V] dictionary
// for every iteration, since the accumulator needs to be immutable
.map { (item) -> [Int: V] in
buffer[item.index] = item.value
return buffer
}
// filter until we get a dictionary of the same size than the original sequence
.filter { $0.count == count }
// map to an array
.map { dic in
return Array(0..<count).map { dic[$0]! }
}
}
}
}
struct Entity {
let id: String
let version: Int
}
func observeEntity(withId id: String) -> Observable<Entity> {
return Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.take(2)
.map { Entity(id: id, version: $0) }
}
let src = ["id:1", "id:2", "id:3"].map(observeEntity)
Observable
.of(["id:1", "id:2", "id:3"])
.sync(project: observeEntity)
.subscribe(
onNext: { print($0) },
onCompleted: { print("completed")
})
// print
//[Entity(id: "id:1", version: 0), Entity(id: "id:2", version: 0), Entity(id: "id:3", version: 0)]
//[Entity(id: "id:1", version: 1), Entity(id: "id:2", version: 0), Entity(id: "id:3", version: 0)]
//[Entity(id: "id:1", version: 1), Entity(id: "id:2", version: 1), Entity(id: "id:3", version: 0)]
//[Entity(id: "id:1", version: 1), Entity(id: "id:2", version: 1), Entity(id: "id:3", version: 1)]
//completed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment