Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active February 20, 2024 13:41
Show Gist options
  • Save danielt1263/820bd8091048aa138c82151966f7c66d to your computer and use it in GitHub Desktop.
Save danielt1263/820bd8091048aa138c82151966f7c66d to your computer and use it in GitHub Desktop.
The pollContinuously operator is useful if you want to subscribe to an Observable again after it finishes...
//
// PollContinuously+Rx.swift
//
// Created by Daniel Tartaglia on 20 Jun 2023.
// Copyright © 2023 Daniel Tartaglia. MIT License.
//
import RxSwift
extension ObservableType {
func pollContinuously(delay: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
.create { observer in
let subject = PublishSubject<Element>()
let output = subject
.subscribe(observer)
let loop = subject
.concatMap { _ in
Observable.just(())
.delay(delay, scheduler: scheduler)
.flatMap { self }
}
.observe(on: scheduler)
.subscribe(subject)
let input = Observable.concat(self.asObservable(), .never())
.subscribe(subject)
return CompositeDisposable(output, loop, input)
}
}
}
class PollContinuouslyTests: XCTestCase {
func test1() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|")
let expected = parseEventsAndTimes(timeline: "-A--B--C--D|", values: { String($0) })
.offsetTime(by: 200)
let result = scheduler.start {
source.pollContinuously(delay: .seconds(1), scheduler: scheduler)
.take(4)
}
XCTAssertEqual(result.events, expected[0])
}
func test2() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|")
let expected = parseEventsAndTimes(timeline: "-A---B---C---D|", values: { String($0) })
.offsetTime(by: 200)
let result = scheduler.start {
source.pollContinuously(delay: .seconds(2), scheduler: scheduler)
.take(4)
}
XCTAssertEqual(result.events, expected[0])
}
func test2_2() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A-B|-C-D|-E-F|-G-H|")
let expected = parseEventsAndTimes(timeline: "-A-B-C-D--E-F--G-H|", values: { String($0) })
.offsetTime(by: 200)
let result = scheduler.start {
source.pollContinuously(delay: .seconds(2), scheduler: scheduler)
.take(8)
}
XCTAssertEqual(result.events, expected[0])
}
}
@giofid
Copy link

giofid commented Feb 20, 2024

Hello @danielt1263,
I noted a Reentrancy anomaly was detected warning when an error occurs during polling. I created a snippet of code to reproduce the problem.

struct MyError: Error { }

func polling() -> Observable<Int> {
    observable().pollContinuously(delay: .seconds(1), scheduler: MainScheduler.instance).take(6)
}

func observable() -> Observable<Int> {
    Observable<Int>.create { observer in
        observer.onError(MyError())
        return Disposables.create {}
    }
}

....

polling()
    .observe(on: MainScheduler.instance)
    .subscribe { value in
        print(value)
    } onError: { error in
        print(error)
    } onDisposed: {
        print("Disposed")
    }
    .disposed(by: disposeBag)

@danielt1263
Copy link
Author

@giofid That is expected behavior with any feedback loop such as this, and is a case where you should be using MainScheduler.asyncInstance instead of .instance.

If you want, you can force the issue like RxFeedback does. See the property var async on line 141 for more information.

@giofid
Copy link

giofid commented Feb 20, 2024

Thank you @danielt1263 for your feedback.

Sorry, I didn't understand your last observation.

If you want, you can force the issue like RxFeedback does. See the property var async on line 141 for more information.

Can you please explain that better?

@danielt1263
Copy link
Author

It's fully explained on line 141 of the link. I can't do better than it does.

@giofid
Copy link

giofid commented Feb 20, 2024

I realized what you meant, I think.

Using let asyncScheduler = scheduler.async instead of scheduler inside pollContinuously.

Thanks as always Daniel!

@danielt1263
Copy link
Author

Yes, exactly. Either that or just pass MainScheduler.asyncInstance to pollContinuously in the first place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment