-
-
Save danielt1263/820bd8091048aa138c82151966f7c66d to your computer and use it in GitHub Desktop.
// | |
// PollContinuously+Rx.swift | |
// | |
// Created by Daniel Tartaglia on 20 Jun 2023. | |
// Copyright © 2023 Daniel Tartaglia. MIT License. | |
// | |
import RxSwift | |
extension ObservableType { | |
func pollContinuously(delay: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { | |
.create { observer in | |
let subject = PublishSubject<Element>() | |
let output = subject | |
.subscribe(observer) | |
let loop = subject | |
.concatMap { _ in | |
Observable.just(()) | |
.delay(delay, scheduler: scheduler) | |
.flatMap { self } | |
} | |
.observe(on: scheduler) | |
.subscribe(subject) | |
let input = Observable.concat(self.asObservable(), .never()) | |
.subscribe(subject) | |
return CompositeDisposable(output, loop, input) | |
} | |
} | |
} |
class PollContinuouslyTests: XCTestCase { | |
func test1() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|") | |
let expected = parseEventsAndTimes(timeline: "-A--B--C--D|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(1), scheduler: scheduler) | |
.take(4) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|") | |
let expected = parseEventsAndTimes(timeline: "-A---B---C---D|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(2), scheduler: scheduler) | |
.take(4) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test2_2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A-B|-C-D|-E-F|-G-H|") | |
let expected = parseEventsAndTimes(timeline: "-A-B-C-D--E-F--G-H|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(2), scheduler: scheduler) | |
.take(8) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
} |
@giofid That is expected behavior with any feedback loop such as this, and is a case where you should be using MainScheduler.asyncInstance
instead of .instance
.
If you want, you can force the issue like RxFeedback does. See the property var async
on line 141 for more information.
Thank you @danielt1263 for your feedback.
Sorry, I didn't understand your last observation.
If you want, you can force the issue like RxFeedback does. See the property var async on line 141 for more information.
Can you please explain that better?
It's fully explained on line 141 of the link. I can't do better than it does.
I realized what you meant, I think.
Using let asyncScheduler = scheduler.async
instead of scheduler
inside pollContinuously
.
Thanks as always Daniel!
Yes, exactly. Either that or just pass MainScheduler.asyncInstance
to pollContinuously
in the first place.
Hello @danielt1263,
I noted a Reentrancy anomaly was detected warning when an error occurs during polling. I created a snippet of code to reproduce the problem.