-
-
Save darrensapalo/6f2bb5946d97cf87033b75aac6b98a43 to your computer and use it in GitHub Desktop.
import RxSwift | |
let backgroundThread = DispatchQueue(label: "com.rx.testing") | |
let numbers : Observable<Int> = Observable.create { obx in | |
[1,2,3,4,5].forEach { e in | |
obx.onNext(e) | |
} | |
obx.onCompleted() | |
return Disposables.create() | |
}.subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: backgroundThread)) | |
let doubleTheNumbers : Observable<Int> = Observable.create { obx in | |
[1,2,3,4,5].forEach { e in | |
obx.onNext(e * 2) | |
sleep(2) | |
} | |
obx.onCompleted() | |
return Disposables.create() | |
} | |
Observable.zip(numbers, doubleTheNumbers) { a, b in | |
return (a, b) | |
} | |
.subscribe ( onNext: { e in | |
print(e) | |
}) | |
/** Outupt | |
(1, 2) | |
(2, 4) | |
(3, 6) | |
(4, 8) | |
(5, 10) | |
*/ |
On the way to exit monad. you are comparing lastID == currentID but there is again no guarantee that device.id.subscribe(onNext: { (id: String) in currentID = id }) has finished executing
An example of what you're worried about (there is no guarantee that the subscribe has finished executing) is seen in the code snippet below:
Wrong code, asynchronous
import RxSwift
import Foundation
import XCPlayground
let backgroundThread = DispatchQueue(label: "com.rx.testing")
print("Step 1")
Observable.from([1,2,3,4])
.subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: backgroundThread))
.observeOn(MainScheduler.instance)
.subscribe( onNext: { e in sleep(1); print(e) })
print("Step 2")
XCPlaygroundPage.currentPage.needsIndefiniteExecution = true
Since the thread on which the observable code will run was specified using the subscribeOn
operator, the output of the code is:
Step 1
Step 2
1
2
3
4
If I understood correctly, your worry is that the closure might return even before the subscribe has finished at the line device.id.subscribe(onNext: { (id: String) in currentID = id })
.
Correct code, synchronous
Notice that in my example and in the code below that (1) we do not specify using either subscribeOn
or observeOn
which thread the code will execute and (2) the output is our desired result which is synchronous code:
import RxSwift
import Foundation
import XCPlayground
let backgroundThread = DispatchQueue(label: "com.rx.testing")
print("Step 1")
Observable.from([1,2,3,4])
// .subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: backgroundThread))
.observeOn(MainScheduler.instance)
.subscribe( onNext: { e in sleep(1); print(e) })
print("Step 2")
XCPlaygroundPage.currentPage.needsIndefiniteExecution = true
Its output is the following:
Step 1
1
2
3
4
Step 2
I hope that clarifies why the code I wrote addresses your concerns. Please let me know if you have anymore questions and do correct me if I misunderstand or if I'm getting something wrong.
Correct me if I'm wrong, but even if one of the observables take a while to execute (i.e.
sleep(1)
on thedoubleTheNumbers
observable) and the other observablenumbers
runs on a separate thread, theObservable.zip
method ensures that an emission will only be emitted when a new pair of emissions is received from the observables it received as parameters.In the rxmarbles.com example, notice that the elements turn up as 3C and 4D, even if the element
3
took a while to emit. Does this address your worry about there are no guarantees that the devices and deviceIDs are updated synchronized?