https://speakerdeck.com/dameleon/dive-to-reactive-extension-with-swift
上記の資料内から、説明に対応するサンプルコードと出力を以下に記載する
let completeObservable = Observable<String>.create { (observer: AnyObserver<String>) -> Disposable in
observer.onNext("hoge")
observer.onNext("fuga")
observer.onCompleted()
observer.onNext("piyo")
return Disposables.create {
print("disposed")
}
}
let disposable = completeObservable.subscribe(
onNext: { (value) in
print("completeObservable onNext: \(value)")
},
onError: { (err) in
print("completeObservable onError: \(err)")
},
onCompleted: {
print("completeObservable onCompleted()")
},
onDisposed: {
print("completeObservable onDisposed()")
})
disposable.dispose()
※構造が分かりやすいように型注釈を書いているが、型推論されるので(observer: AnyObserver<String>) -> Disposable
の指定は不要
completeObservable onNext: hoge
completeObservable onNext: fuga
completeObservable onCompleted()
completeObservable onDisposed()
disposed
- 完了後に
onNext("piyo")
したものはストリームに流れない
enum SampleError: Error {
case Test
}
let errorObservable = Observable<String>.create { (observer: AnyObserver<String>) -> Disposable in
observer.onNext("hoge")
observer.onNext("fuga")
observer.onError(SampleError.Test)
observer.onNext("piyo")
return Disposables.create {
print("disposed")
}
}
let disposable = errorObservable.subscribe(
onNext: { (value) in
print("errorObservable onNext: \(value)")
},
onError: { (err) in
print("errorObservable onError: \(err)")
},
onCompleted: {
print("errorObservable onCompleted()")
},
onDisposed: {
print("errorObservable onDisposed()")
})
disposable.dispose()
errorObservable onNext: hoge
errorObservable onNext: fuga
errorObservable onError: Test
errorObservable onDisposed()
disposed
- エラー発生後に
onNext("piyo")
したものはストリームに流れない
func asyncTask() -> Observable<String> {
let subject = PublishSubject<String>()
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
subject.onNext("hoge")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
subject.onNext("fuga")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
subject.onNext("piyo")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
subject.onCompleted()
}
return subject.asObservable()
}
// main
let disposeBag = DisposeBag()
let task = asyncTask()
task.subscribe(
onNext: { (value) in
print("A onNext: \(value)")
},
onCompleted: {
print("A onCompleted()")
},
onDisposed: {
print("A onDisposed()")
})
.addDisposableTo(disposeBag)
Thread.sleep(forTimeInterval: 2.5)
task.subscribe(
onNext: { (value) in
print("B onNext: \(value)")
},
onCompleted: {
print("B onCompleted()")
},
onDisposed: {
print("B onDisposed()")
})
.addDisposableTo(disposeBag)
A onNext: hoge
A onNext: fuga
A onNext: piyo
B onNext: piyo
A onCompleted()
A onDisposed()
B onCompleted()
B onDisposed()
- "B"のSubscriberは2.5秒後に開始しているので、"hoge", "fuga"を購読できない
let source = Observable<String>.create { (observer) in
observer.onNext("hoge at \(Date().timeIntervalSince1970)")
observer.onNext("fuga at \(Date().timeIntervalSince1970)")
observer.onCompleted()
return Disposables.create()
}
print("A subscribe() at \(Date().timeIntervalSince1970)")
source.subscribe(
onNext: { (value) in
print("A onNext: \(value)")
},
onCompleted: {
print("A onCompleted()")
})
.dispose()
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
print("B subscribe() at \(Date().timeIntervalSince1970)")
source.subscribe(
onNext: { (value) in
print("B onNext: \(value)")
},
onCompleted: {
print("B onCompleted()")
})
.dispose()
}
A subscribe() at 1477905878.70609
A onNext: hoge at 1477905878.70654
A onNext: fuga at 1477905878.70708
A onCompleted()
B subscribe() at 1477905883.70776
B onNext: hoge at 1477905883.70793
B onNext: fuga at 1477905883.70812
B onCompleted()
func asyncTask() -> Observable<String> {
let subject = PublishSubject<String>()
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
subject.onNext("hoge at \(Date().timeIntervalSince1970)")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
subject.onNext("fuga at \(Date().timeIntervalSince1970)")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
subject.onNext("piyo at \(Date().timeIntervalSince1970)")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
subject.onCompleted()
}
return subject.asObservable()
}
// main
let disposeBag = DisposeBag()
let task = asyncTask()
task.subscribe(
onNext: { (value) in
print("A onNext: \(value)")
},
onCompleted: {
print("A onCompleted()")
})
.addDisposableTo(disposeBag)
Thread.sleep(forTimeInterval: 2.5)
task.subscribe(
onNext: { (value) in
print("B onNext: \(value)")
},
onCompleted: {
print("B onCompleted()")
})
.addDisposableTo(disposeBag)
A onNext: hoge at 1477906080.82776
A onNext: fuga at 1477906081.92827
A onNext: piyo at 1477906082.7435
B onNext: piyo at 1477906082.7435
A onCompleted()
B onCompleted()
let disposeBag = DisposeBag()
let coldObservable = Observable<String>.create { (observer) in
observer.onNext("hoge at \(Date().timeIntervalSince1970)")
observer.onNext("fuga at \(Date().timeIntervalSince1970)")
observer.onCompleted()
return Disposables.create()
}
let hotObservable = coldObservable.publish()
print("A subscribe() at \(Date().timeIntervalSince1970)")
hotObservable.subscribe(
onNext: { (value) in
print("A onNext: \(value)")
},
onCompleted: {
print("A onCompleted()")
})
.addDisposableTo(disposeBag)
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("B subscribe() at \(Date().timeIntervalSince1970)")
hotObservable.subscribe(
onNext: { (value) in
print("B onNext: \(value)")
},
onCompleted: {
print("B onCompleted()")
})
.addDisposableTo(disposeBag)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
hotObservable.connect().addDisposableTo(disposeBag)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
print("C subscribe() at \(Date().timeIntervalSince1970)")
hotObservable.subscribe(
onNext: { (value) in
print("C onNext: \(value)")
},
onCompleted: {
print("C onCompleted()")
})
.addDisposableTo(disposeBag)
}
A subscribe() at 1477909112.28208
B subscribe() at 1477909115.58164
A onNext: hoge at 1477909117.77993
B onNext: hoge at 1477909117.77993
A onNext: fuga at 1477909117.78148
B onNext: fuga at 1477909117.78148
A onCompleted()
B onCompleted()
C subscribe() at 1477909121.0829
C onCompleted()
func asyncTask() -> Observable<String> {
let subject = PublishSubject<String>()
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
subject.onNext("hoge")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
subject.onNext("fuga")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
subject.onNext("piyo")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
subject.onCompleted()
}
return subject.asObservable()
}
// main
let disposeBag = DisposeBag()
let coldObservable = asyncTask()
.map { "\($0) at \(Date().timeIntervalSince1970)" }
coldObservable.subscribe(
onNext: { (value) in
print("A onNext: \(value)")
},
onCompleted: {
print("A onCompleted()")
})
.addDisposableTo(disposeBag)
coldObservable.subscribe(
onNext: { (value) in
print("B onNext: \(value)")
},
onCompleted: {
print("B onCompleted()")
})
.addDisposableTo(disposeBag)
Thread.sleep(forTimeInterval: 2.5)
coldObservable.subscribe(
onNext: { (value) in
print("C onNext: \(value)")
},
onCompleted: {
print("C onCompleted()")
})
.addDisposableTo(disposeBag)
A onNext: hoge at 1477911498.45086
B onNext: hoge at 1477911498.45198
A onNext: fuga at 1477911499.55275
B onNext: fuga at 1477911499.55311
A onNext: piyo at 1477911500.65575
B onNext: piyo at 1477911500.65621
C onNext: piyo at 1477911500.65654
A onCompleted()
B onCompleted()
C onCompleted()
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
observableA
.subscribe(onNext: {
// ...
})
.addDisposableTo(self.disposeBag)
observableB
.subscribe(onNext: {
// ...
})
.addDisposableTo(self.disposeBag)
}
}
let serialScheduler = SerialDispatchQueueScheduler(queue: DispatchQueue.global(), internalSerialQueueName: "global_serial_queue")
let observable = Observable.from(0..<5)
observable
.observeOn(serialScheduler)
.subscribe { print("A: \($0) at \(Thread.current)") }
observable
.observeOn(serialScheduler)
.subscribe { print("B: \($0) at \(Thread.current)") }
A: next(0) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(1) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(2) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(3) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(4) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: completed at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(0) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(1) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(2) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(3) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(4) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: completed at <NSThread: 0x608000078300>{number = 3, name = (null)}
let concurrentScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
let observable = Observable.from(0..<5)
observable
.observeOn(concurrentScheduler)
.subscribe { print("A: \($0) at \(Thread.current)") }
observable
.observeOn(concurrentScheduler)
.subscribe { print("B: \($0) at \(Thread.current)") }
A: next(0) at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
B: next(0) at <NSThread: 0x600000079900>{number = 4, name = (null)}
A: next(1) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
B: next(1) at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
A: next(2) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
B: next(2) at <NSThread: 0x600000079900>{number = 4, name = (null)}
B: next(3) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
A: next(3) at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
B: next(4) at <NSThread: 0x600000079900>{number = 4, name = (null)}
B: completed at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
A: next(4) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
A: completed at <NSThread: 0x600000079900>{number = 4, name = (null)}
let concurrentScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
let observable = Observable<Int>.create { (observer) in
print("subscribed at \(Thread.current)")
observer.onNext(1)
observer.onNext(2)
observer.onCompleted()
return Disposables.create()
}
observable
.subscribeOn(concurrentScheduler)
.observeOn(MainScheduler.instance)
.subscribe { print("\($0) at \(Thread.current)") }
subscribed at <NSThread: 0x61000007e840>{number = 3, name = (null)}
next(1) at <NSThread: 0x61800007e280>{number = 1, name = main}
next(2) at <NSThread: 0x61800007e280>{number = 1, name = main}
completed at <NSThread: 0x61800007e280>{number = 1, name = main}
Observable.create()
のブロック実行は<NSThread: 0x61000007e840>{number = 3, name = (null)}
subscribe()
のブロック実行は<NSThread: 0x61800007e280>{number = 1, name = main}
になっているのが確認できる