Skip to content

Instantly share code, notes, and snippets.

@simonpang
Last active November 27, 2018 07:22
Show Gist options
  • Save simonpang/1c2414e524d16e07f84696ad1d43455c to your computer and use it in GitHub Desktop.
Save simonpang/1c2414e524d16e07f84696ad1d43455c to your computer and use it in GitHub Desktop.
RxSwift multi-threading tests
//
// RxSwiftMultithreadTests.swift
// GoodNotesTests
//
// Created by Simon Pang on 27/11/2018.
// Copyright © 2018 Time Base Technology Limited. All rights reserved.
//
import XCTest
import RxSwift
class RxSwiftMultithreadTests: XCTestCase {
// MARK: Update while reading
func testUpdateVariableValue_WhenUpdateInsideSubscriptionBlock_Should_LogReentrancyWarnings() {
let variable = Variable<Int>(0)
_ = variable.asObservable()
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue)")
if (currentValue < 50) {
variable.value = currentValue + 1
}
}, onCompleted: {
NSLog("Completed")
})
}
func testUpdateVariableValue_WhenUpdateInsideSubscriptionBlock_Should_Success() {
let variable = Variable<Int>(0)
_ = variable.asObservable()
.observeOn(ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "label")))
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue)")
if (currentValue < 5) {
variable.value = currentValue + 1
}
}, onCompleted: {
NSLog("Completed")
})
}
func testPublishSubject_WhenUpdateInsideSubscriptionBlock_Should_LogReentrancyWarnings() {
let subject = PublishSubject<Int>()
_ = subject.asObservable()
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue)")
if (currentValue < 50) {
subject.onNext(currentValue + 1)
}
}, onCompleted: {
NSLog("Completed")
})
subject.onNext(0)
}
// MARK: Variable Multi-Threading
func random() -> Double {
return Double(arc4random() % 100000) / 100000
}
func testUpdateVariableValue_WhenUpdateFromBackgroundThread_Should_Success() {
let variable = Variable<Int>(0)
_ = variable.asObservable()
.observeOn(MainScheduler.instance)
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue) isMainThread: \(Thread.isMainThread)")
Thread.sleep(until: Date() + 0.01 * self.random())
}, onCompleted: {
NSLog("Completed")
})
let expectation = XCTestExpectation(description: "wait")
DispatchQueue.global().async {
for i in (0..<100) {
Thread.sleep(until: Date() + 0.01 * self.random())
NSLog("Sending value \(i) backgroundThread: \(Thread.current)")
variable.value = i
}
expectation.fulfill()
}
wait(for: [expectation], timeout: 1000)
}
func testUpdateVariableValue_WhenUpdateFromMultipleBackgroundThreads_Should_LogSynchronizationAnomalyWarnings() {
let variable = Variable<Int>(0)
_ = variable.asObservable()
.observeOn(MainScheduler.instance)
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue) isMainThread: \(Thread.isMainThread)")
Thread.sleep(until: Date() + 0.01 * self.random())
}, onCompleted: {
NSLog("Completed")
})
let expectation = XCTestExpectation(description: "wait")
for _ in (0..<4) {
DispatchQueue.global().async {
for i in (0..<50) {
Thread.sleep(until: Date() + 0.01 * self.random())
NSLog("Sending value \(i) backgroundThread: \(Thread.current)")
variable.value = i
}
expectation.fulfill()
}
}
wait(for: [expectation], timeout: 1000)
}
func testUpdateVariableValue_WhenUpdateFromASerialQueue_Should_Success() {
let variable = Variable<Int>(0)
_ = variable.asObservable()
.subscribeOn(MainScheduler.instance)
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue) isMainThread: \(Thread.isMainThread)")
Thread.sleep(until: Date() + 0.01 * self.random())
}, onCompleted: {
NSLog("Completed")
})
let expectation = XCTestExpectation(description: "wait")
let serialQueue = DispatchQueue(label: "serial queue")
for _ in (0..<4) {
DispatchQueue.global().async {
for i in (0..<50) {
Thread.sleep(until: Date() + 0.01 * self.random())
NSLog("Sending value \(i) backgroundThread: \(Thread.current)")
serialQueue.async {
variable.value = i
}
}
expectation.fulfill()
}
}
wait(for: [expectation], timeout: 1000)
}
// MARK: PublishSubject Multi-threading
func testPublishSubject_WhenUpdateFromBackgroundThread_Should_Success() {
let subject = PublishSubject<Int>()
_ = subject.asObservable()
.observeOn(MainScheduler.instance)
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue) isMainThread: \(Thread.isMainThread)")
Thread.sleep(until: Date() + 0.01 * self.random())
}, onCompleted: {
NSLog("Completed")
})
let expectation = XCTestExpectation(description: "wait")
DispatchQueue.global().async {
for i in (0..<100) {
Thread.sleep(until: Date() + 0.01 * self.random())
NSLog("Sending value \(i) backgroundThread: \(Thread.current)")
subject.onNext(i)
}
expectation.fulfill()
}
wait(for: [expectation], timeout: 1000)
}
func testPublishSubject_WhenUpdateFromMultipleBackgroundThread_Should_LogSyncrhonizationWarnings() {
let subject = PublishSubject<Int>()
_ = subject.asObservable()
.observeOn(MainScheduler.instance)
.subscribe(onNext: { (currentValue) in
NSLog("Getting value \(currentValue) isMainThread: \(Thread.isMainThread)")
Thread.sleep(until: Date() + 0.01 * self.random())
}, onCompleted: {
NSLog("Completed")
})
let expectation = XCTestExpectation(description: "wait")
for _ in (0..<4) {
DispatchQueue.global().async {
for i in (0..<100) {
Thread.sleep(until: Date() + 0.01 * self.random())
NSLog("Sending value \(i) backgroundThread: \(Thread.current)")
subject.onNext(i)
}
expectation.fulfill()
}
}
wait(for: [expectation], timeout: 1000)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment