Skip to content

Instantly share code, notes, and snippets.

@pgherveou
Last active December 20, 2017 16:36
Show Gist options
  • Save pgherveou/4e4cacd68f25e5eb3950816b2923dbcc to your computer and use it in GitHub Desktop.
Save pgherveou/4e4cacd68f25e5eb3950816b2923dbcc to your computer and use it in GitHub Desktop.
RxSwift + concatMap
import PlaygroundSupport
import RxSwift
PlaygroundPage.current.needsIndefiniteExecution = true
/// ConcatMap Operator
/// Runs all observable sequences in parallel and concat their elements.
extension ObservableType {
func concatMap<T>(project: @escaping (E) -> Observable<T>) -> Observable<T> {
return self.map { (element) -> Observable<T> in
let sharedObserver = project(element).replayAll()
let _ = sharedObserver.connect()
return sharedObserver.asObservable()
}
.concat()
}
}
/// sample logger
let logger: (String) -> Void = {
let ref = Date()
let formatter = NumberFormatter()
formatter.numberStyle = NumberFormatter.Style.decimal
formatter.maximumFractionDigits = 1
formatter.minimumFractionDigits = 1
formatter.roundingMode = .down
return {
let interval = Date().timeIntervalSince(ref)
let time = formatter.string(from: NSNumber(value: interval))
print("[time: \(time!)] \($0)")
}
}()
/// sample async String -> Observable<String>
func asyncWork(task: String) -> Observable<String> {
return Observable.create() { observer in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
observer.onNext("\(task) processed")
observer.onCompleted()
}
return Disposables.create()
}
}
/// Test ConcatOperator
logger("start")
let _ = Observable
.of("task 1", "task 2")
.concatMap(project: asyncWork)
.subscribe(onNext: { logger($0) })
// print:
// [time: 0.0] start
// [time: 1.0] task 1 processed
// [time: 1.0] task 2 processed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment