Skip to content

Instantly share code, notes, and snippets.

@darrensapalo
Last active November 16, 2016 17:05
Show Gist options
  • Save darrensapalo/6f2bb5946d97cf87033b75aac6b98a43 to your computer and use it in GitHub Desktop.
Save darrensapalo/6f2bb5946d97cf87033b75aac6b98a43 to your computer and use it in GitHub Desktop.
Observable.zip guarantees that an emission will trigger only when both/all the observables have emitted.
import RxSwift
let backgroundThread = DispatchQueue(label: "com.rx.testing")
let numbers : Observable<Int> = Observable.create { obx in
[1,2,3,4,5].forEach { e in
obx.onNext(e)
}
obx.onCompleted()
return Disposables.create()
}.subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: backgroundThread))
let doubleTheNumbers : Observable<Int> = Observable.create { obx in
[1,2,3,4,5].forEach { e in
obx.onNext(e * 2)
sleep(2)
}
obx.onCompleted()
return Disposables.create()
}
Observable.zip(numbers, doubleTheNumbers) { a, b in
return (a, b)
}
.subscribe ( onNext: { e in
print(e)
})
/** Outupt
(1, 2)
(2, 4)
(3, 6)
(4, 8)
(5, 10)
*/
@darrensapalo
Copy link
Author

darrensapalo commented Nov 16, 2016

The Staying in the monad : You are expecting devices and deviceIDs to be updated synchronized when in reality there are no guarantees they are.

Correct me if I'm wrong, but even if one of the observables take a while to execute (i.e. sleep(1) on the doubleTheNumbers observable) and the other observable numbers runs on a separate thread, the Observable.zip method ensures that an emission will only be emitted when a new pair of emissions is received from the observables it received as parameters.

In the rxmarbles.com example, notice that the elements turn up as 3C and 4D, even if the element 3 took a while to emit. Does this address your worry about there are no guarantees that the devices and deviceIDs are updated synchronized?

@darrensapalo
Copy link
Author

darrensapalo commented Nov 16, 2016

On the way to exit monad. you are comparing lastID == currentID but there is again no guarantee that device.id.subscribe(onNext: { (id: String) in currentID = id }) has finished executing

An example of what you're worried about (there is no guarantee that the subscribe has finished executing) is seen in the code snippet below:

Wrong code, asynchronous

import RxSwift
import Foundation
import XCPlayground

let backgroundThread = DispatchQueue(label: "com.rx.testing")

print("Step 1")

Observable.from([1,2,3,4])
    .subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: backgroundThread))
    .observeOn(MainScheduler.instance)
    .subscribe( onNext: { e in sleep(1); print(e) })

print("Step 2")

XCPlaygroundPage.currentPage.needsIndefiniteExecution = true

Since the thread on which the observable code will run was specified using the subscribeOn operator, the output of the code is:

Step 1
Step 2
1
2
3
4

If I understood correctly, your worry is that the closure might return even before the subscribe has finished at the line device.id.subscribe(onNext: { (id: String) in currentID = id }).

Correct code, synchronous

Notice that in my example and in the code below that (1) we do not specify using either subscribeOn or observeOn which thread the code will execute and (2) the output is our desired result which is synchronous code:

import RxSwift
import Foundation
import XCPlayground

let backgroundThread = DispatchQueue(label: "com.rx.testing")

print("Step 1")

Observable.from([1,2,3,4])
    // .subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: backgroundThread))
    .observeOn(MainScheduler.instance)
    .subscribe( onNext: { e in sleep(1); print(e) })

print("Step 2")

XCPlaygroundPage.currentPage.needsIndefiniteExecution = true

Its output is the following:

Step 1
1
2
3
4
Step 2

I hope that clarifies why the code I wrote addresses your concerns. Please let me know if you have anymore questions and do correct me if I misunderstand or if I'm getting something wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment