Last active
February 20, 2024 13:41
-
-
Save danielt1263/820bd8091048aa138c82151966f7c66d to your computer and use it in GitHub Desktop.
The pollContinuously operator is useful if you want to subscribe to an Observable again after it finishes...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// PollContinuously+Rx.swift | |
// | |
// Created by Daniel Tartaglia on 20 Jun 2023. | |
// Copyright © 2023 Daniel Tartaglia. MIT License. | |
// | |
import RxSwift | |
extension ObservableType { | |
func pollContinuously(delay: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { | |
.create { observer in | |
let subject = PublishSubject<Element>() | |
let output = subject | |
.subscribe(observer) | |
let loop = subject | |
.concatMap { _ in | |
Observable.just(()) | |
.delay(delay, scheduler: scheduler) | |
.flatMap { self } | |
} | |
.observe(on: scheduler) | |
.subscribe(subject) | |
let input = Observable.concat(self.asObservable(), .never()) | |
.subscribe(subject) | |
return CompositeDisposable(output, loop, input) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PollContinuouslyTests: XCTestCase { | |
func test1() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|") | |
let expected = parseEventsAndTimes(timeline: "-A--B--C--D|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(1), scheduler: scheduler) | |
.take(4) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|") | |
let expected = parseEventsAndTimes(timeline: "-A---B---C---D|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(2), scheduler: scheduler) | |
.take(4) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test2_2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A-B|-C-D|-E-F|-G-H|") | |
let expected = parseEventsAndTimes(timeline: "-A-B-C-D--E-F--G-H|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(2), scheduler: scheduler) | |
.take(8) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
} |
I realized what you meant, I think.
Using let asyncScheduler = scheduler.async
instead of scheduler
inside pollContinuously
.
Thanks as always Daniel!
Yes, exactly. Either that or just pass MainScheduler.asyncInstance
to pollContinuously
in the first place.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It's fully explained on line 141 of the link. I can't do better than it does.