Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Dive to Reactive Extension with Swift サンプルコード

本エントリの使い方

https://speakerdeck.com/dameleon/dive-to-reactive-extension-with-swift

上記の資料内から、説明に対応するサンプルコードと出力を以下に記載する

サンプルコード

Observableの生成と購読(Observable.create)

let completeObservable = Observable<String>.create { (observer: AnyObserver<String>) -> Disposable in
    observer.onNext("hoge")
    observer.onNext("fuga")
    observer.onCompleted()
    observer.onNext("piyo")
    return Disposables.create {
        print("disposed")
    }
}

let disposable = completeObservable.subscribe(
    onNext: { (value) in
        print("completeObservable onNext: \(value)")
    },
    onError: { (err) in
        print("completeObservable onError: \(err)")
    },
    onCompleted: {
        print("completeObservable onCompleted()")
    },
    onDisposed: {
        print("completeObservable onDisposed()")
    })

disposable.dispose()

※構造が分かりやすいように型注釈を書いているが、型推論されるので(observer: AnyObserver<String>) -> Disposableの指定は不要

completeObservable onNext: hoge
completeObservable onNext: fuga
completeObservable onCompleted()
completeObservable onDisposed()
disposed
  • 完了後にonNext("piyo")したものはストリームに流れない

エラーを発生させる

enum SampleError: Error {
    case Test
}

let errorObservable = Observable<String>.create { (observer: AnyObserver<String>) -> Disposable in
    observer.onNext("hoge")
    observer.onNext("fuga")
    observer.onError(SampleError.Test)
    observer.onNext("piyo")
    return Disposables.create {
        print("disposed")
    }
}

let disposable = errorObservable.subscribe(
    onNext: { (value) in
        print("errorObservable onNext: \(value)")
    },
    onError: { (err) in
        print("errorObservable onError: \(err)")
    },
    onCompleted: {
        print("errorObservable onCompleted()")
    },
    onDisposed: {
        print("errorObservable onDisposed()")
    })

disposable.dispose()
errorObservable onNext: hoge
errorObservable onNext: fuga
errorObservable onError: Test
errorObservable onDisposed()
disposed
  • エラー発生後にonNext("piyo")したものはストリームに流れない

Observableの生成と購読(PublishSubject)

func asyncTask() -> Observable<String> {
    let subject = PublishSubject<String>()
    
    DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
        subject.onNext("hoge")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
        subject.onNext("fuga")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
        subject.onNext("piyo")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
        subject.onCompleted()
    }
    return subject.asObservable()
}

// main
let disposeBag = DisposeBag()
let task = asyncTask()

task.subscribe(
    onNext: { (value) in
        print("A onNext: \(value)")
    },
    onCompleted: {
        print("A onCompleted()")
    },
    onDisposed: {
        print("A onDisposed()")
    })
    .addDisposableTo(disposeBag)

Thread.sleep(forTimeInterval: 2.5)

task.subscribe(
    onNext: { (value) in
        print("B onNext: \(value)")
    },
    onCompleted: {
        print("B onCompleted()")
    },
    onDisposed: {
        print("B onDisposed()")
    })
    .addDisposableTo(disposeBag)
A onNext: hoge
A onNext: fuga
A onNext: piyo
B onNext: piyo
A onCompleted()
A onDisposed()
B onCompleted()
B onDisposed()
  • "B"のSubscriberは2.5秒後に開始しているので、"hoge", "fuga"を購読できない

値の発行(オペレータの動作), Streamの分岐

Observable.createの例に、値の発行時間を付加する

let source = Observable<String>.create { (observer) in
    observer.onNext("hoge at \(Date().timeIntervalSince1970)")
    observer.onNext("fuga at \(Date().timeIntervalSince1970)")
    observer.onCompleted()
    return Disposables.create()
}

print("A subscribe() at \(Date().timeIntervalSince1970)")
source.subscribe(
    onNext: { (value) in
        print("A onNext: \(value)")
    },
    onCompleted: {
        print("A onCompleted()")
    })
    .dispose()

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    print("B subscribe() at \(Date().timeIntervalSince1970)")
    source.subscribe(
        onNext: { (value) in
            print("B onNext: \(value)")
        },
        onCompleted: {
            print("B onCompleted()")
    })
    .dispose()
}
A subscribe() at 1477905878.70609
A onNext: hoge at 1477905878.70654
A onNext: fuga at 1477905878.70708
A onCompleted()
B subscribe() at 1477905883.70776
B onNext: hoge at 1477905883.70793
B onNext: fuga at 1477905883.70812
B onCompleted()

PublishSubjectの例に、値の発行時間を付加する

func asyncTask() -> Observable<String> {
    let subject = PublishSubject<String>()
    
    DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
        subject.onNext("hoge at \(Date().timeIntervalSince1970)")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
        subject.onNext("fuga at \(Date().timeIntervalSince1970)")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
        subject.onNext("piyo at \(Date().timeIntervalSince1970)")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
        subject.onCompleted()
    }
    return subject.asObservable()
}

// main
let disposeBag = DisposeBag()
let task = asyncTask()

task.subscribe(
    onNext: { (value) in
        print("A onNext: \(value)")
    },
    onCompleted: {
        print("A onCompleted()")
    })
    .addDisposableTo(disposeBag)

Thread.sleep(forTimeInterval: 2.5)

task.subscribe(
    onNext: { (value) in
        print("B onNext: \(value)")
    },
    onCompleted: {
        print("B onCompleted()")
    })
    .addDisposableTo(disposeBag)
A onNext: hoge at 1477906080.82776
A onNext: fuga at 1477906081.92827
A onNext: piyo at 1477906082.7435
B onNext: piyo at 1477906082.7435
A onCompleted()
B onCompleted()

上流に対する影響

Cold ObservableをHot Observableに変換する

let disposeBag = DisposeBag()
let coldObservable = Observable<String>.create { (observer) in
    observer.onNext("hoge at \(Date().timeIntervalSince1970)")
    observer.onNext("fuga at \(Date().timeIntervalSince1970)")
    observer.onCompleted()
    return Disposables.create()
}
let hotObservable = coldObservable.publish()

print("A subscribe() at \(Date().timeIntervalSince1970)")
hotObservable.subscribe(
    onNext: { (value) in
        print("A onNext: \(value)")
    },
    onCompleted: {
        print("A onCompleted()")
    })
    .addDisposableTo(disposeBag)

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("B subscribe() at \(Date().timeIntervalSince1970)")
    hotObservable.subscribe(
        onNext: { (value) in
            print("B onNext: \(value)")
        },
        onCompleted: {
            print("B onCompleted()")
    })
    .addDisposableTo(disposeBag)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    hotObservable.connect().addDisposableTo(disposeBag)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
    print("C subscribe() at \(Date().timeIntervalSince1970)")
    hotObservable.subscribe(
        onNext: { (value) in
            print("C onNext: \(value)")
        },
        onCompleted: {
            print("C onCompleted()")
    })
    .addDisposableTo(disposeBag)
}
A subscribe() at 1477909112.28208
B subscribe() at 1477909115.58164
A onNext: hoge at 1477909117.77993
B onNext: hoge at 1477909117.77993
A onNext: fuga at 1477909117.78148
B onNext: fuga at 1477909117.78148
A onCompleted()
B onCompleted()
C subscribe() at 1477909121.0829
C onCompleted()

Hot ObservableをCold Observableに変換する

func asyncTask() -> Observable<String> {
    let subject = PublishSubject<String>()
    
    DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
        subject.onNext("hoge")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
        subject.onNext("fuga")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
        subject.onNext("piyo")
    }
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
        subject.onCompleted()
    }
    return subject.asObservable()
}

// main
let disposeBag = DisposeBag()
let coldObservable = asyncTask()
    .map { "\($0) at \(Date().timeIntervalSince1970)" }

coldObservable.subscribe(
    onNext: { (value) in
        print("A onNext: \(value)")
    },
    onCompleted: {
        print("A onCompleted()")
    })
    .addDisposableTo(disposeBag)

coldObservable.subscribe(
    onNext: { (value) in
        print("B onNext: \(value)")
    },
    onCompleted: {
        print("B onCompleted()")
    })
    .addDisposableTo(disposeBag)

Thread.sleep(forTimeInterval: 2.5)

coldObservable.subscribe(
    onNext: { (value) in
        print("C onNext: \(value)")
    },
    onCompleted: {
        print("C onCompleted()")
    })
    .addDisposableTo(disposeBag)
A onNext: hoge at 1477911498.45086
B onNext: hoge at 1477911498.45198
A onNext: fuga at 1477911499.55275
B onNext: fuga at 1477911499.55311
A onNext: piyo at 1477911500.65575
B onNext: piyo at 1477911500.65621
C onNext: piyo at 1477911500.65654
A onCompleted()
B onCompleted()
C onCompleted()

Disposable

DisposeBag

class ViewController: UIViewController {
    let disposeBag = DisposeBag()

    override func viewDidLoad() {
        super.viewDidLoad()
        observableA
            .subscribe(onNext: {
                // ...
            })
            .addDisposableTo(self.disposeBag)
        observableB
            .subscribe(onNext: {
                // ...
            })
            .addDisposableTo(self.disposeBag)
    }
}

Schedulers

Serial Scheduler

let serialScheduler = SerialDispatchQueueScheduler(queue: DispatchQueue.global(), internalSerialQueueName: "global_serial_queue")
let observable = Observable.from(0..<5)

observable
    .observeOn(serialScheduler)
    .subscribe { print("A: \($0) at \(Thread.current)") }
observable
    .observeOn(serialScheduler)
    .subscribe { print("B: \($0) at \(Thread.current)") }
A: next(0) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(1) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(2) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(3) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: next(4) at <NSThread: 0x608000078300>{number = 3, name = (null)}
A: completed at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(0) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(1) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(2) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(3) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: next(4) at <NSThread: 0x608000078300>{number = 3, name = (null)}
B: completed at <NSThread: 0x608000078300>{number = 3, name = (null)}

Concurrent Scheduler

let concurrentScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
let observable = Observable.from(0..<5)

observable
    .observeOn(concurrentScheduler)
    .subscribe { print("A: \($0) at \(Thread.current)") }
observable
    .observeOn(concurrentScheduler)
    .subscribe { print("B: \($0) at \(Thread.current)") }
A: next(0) at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
B: next(0) at <NSThread: 0x600000079900>{number = 4, name = (null)}
A: next(1) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
B: next(1) at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
A: next(2) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
B: next(2) at <NSThread: 0x600000079900>{number = 4, name = (null)}
B: next(3) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
A: next(3) at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
B: next(4) at <NSThread: 0x600000079900>{number = 4, name = (null)}
B: completed at <NSThread: 0x61800007ea00>{number = 3, name = (null)}
A: next(4) at <NSThread: 0x600000079a40>{number = 5, name = (null)}
A: completed at <NSThread: 0x600000079900>{number = 4, name = (null)}

Schedulerを切り替える

let concurrentScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
let observable = Observable<Int>.create { (observer) in
    print("subscribed at \(Thread.current)")
    observer.onNext(1)
    observer.onNext(2)
    observer.onCompleted()
    return Disposables.create()
}
    
observable
    .subscribeOn(concurrentScheduler)
    .observeOn(MainScheduler.instance)
    .subscribe { print("\($0) at \(Thread.current)") }
subscribed at <NSThread: 0x61000007e840>{number = 3, name = (null)}
next(1) at <NSThread: 0x61800007e280>{number = 1, name = main}
next(2) at <NSThread: 0x61800007e280>{number = 1, name = main}
completed at <NSThread: 0x61800007e280>{number = 1, name = main}
  • Observable.create()のブロック実行は <NSThread: 0x61000007e840>{number = 3, name = (null)}
  • subscribe()のブロック実行は <NSThread: 0x61800007e280>{number = 1, name = main}

になっているのが確認できる

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment