Last active
February 20, 2024 13:41
-
-
Save danielt1263/820bd8091048aa138c82151966f7c66d to your computer and use it in GitHub Desktop.
The pollContinuously operator is useful if you want to subscribe to an Observable again after it finishes...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// PollContinuously+Rx.swift | |
// | |
// Created by Daniel Tartaglia on 20 Jun 2023. | |
// Copyright © 2023 Daniel Tartaglia. MIT License. | |
// | |
import RxSwift | |
extension ObservableType { | |
func pollContinuously(delay: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { | |
.create { observer in | |
let subject = PublishSubject<Element>() | |
let output = subject | |
.subscribe(observer) | |
let loop = subject | |
.concatMap { _ in | |
Observable.just(()) | |
.delay(delay, scheduler: scheduler) | |
.flatMap { self } | |
} | |
.observe(on: scheduler) | |
.subscribe(subject) | |
let input = Observable.concat(self.asObservable(), .never()) | |
.subscribe(subject) | |
return CompositeDisposable(output, loop, input) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PollContinuouslyTests: XCTestCase { | |
func test1() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|") | |
let expected = parseEventsAndTimes(timeline: "-A--B--C--D|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(1), scheduler: scheduler) | |
.take(4) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|") | |
let expected = parseEventsAndTimes(timeline: "-A---B---C---D|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(2), scheduler: scheduler) | |
.take(4) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test2_2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A-B|-C-D|-E-F|-G-H|") | |
let expected = parseEventsAndTimes(timeline: "-A-B-C-D--E-F--G-H|", values: { String($0) }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
source.pollContinuously(delay: .seconds(2), scheduler: scheduler) | |
.take(8) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
} |
Yes, exactly. Either that or just pass MainScheduler.asyncInstance
to pollContinuously
in the first place.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I realized what you meant, I think.
Using
let asyncScheduler = scheduler.async
instead ofscheduler
insidepollContinuously
.Thanks as always Daniel!