Last active
October 16, 2023 15:44
-
-
Save danielt1263/ab32f91a27c8c8e6d2d0fa0aae6afa5c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// ThrottleDebounceLatest.swift | |
// | |
// Created by Daniel Tartaglia on 14 Oct 2023. | |
// Copyright © 2023 Daniel Tartaglia. MIT License. | |
// | |
import Foundation | |
import RxSwift | |
extension ObservableType { | |
/** | |
returns an Observable that emits the first item emitted by the source Observable then ignores elements from the | |
source which are followed by another element within a specified relative time duration, using the specified | |
scheduler to run throttling timers. | |
- parameter dueTime: Throttling duration for each element. | |
- parameter scheduler: Scheduler to run the throttle timers on. | |
- returns: The throttled sequence. | |
*/ | |
func throttleDebounceLatest(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { | |
Observable.create { observer in | |
var lastFire = RxTime?.none | |
var nextEmit = (Event<Element>, Disposable)?.none | |
let lock = NSRecursiveLock() | |
func delay(event: Event<Self.Element>) -> Disposable { | |
scheduler.scheduleRelative((), dueTime: dueTime) { | |
lock.lock(); defer { lock.unlock() } | |
observer.on(event) | |
nextEmit = nil | |
return Disposables.create() | |
} | |
} | |
return self.subscribe { event in | |
lock.lock(); defer { lock.unlock() } | |
switch event { | |
case .next: | |
if lastFire == nil || dueTime.asTimeInterval < scheduler.now.timeIntervalSince(lastFire!) { | |
observer.on(event) | |
} else { | |
nextEmit?.1.dispose() | |
nextEmit = (event, delay(event: event)) | |
} | |
lastFire = scheduler.now | |
case .error: | |
nextEmit?.1.dispose() | |
observer.on(event) | |
case .completed: | |
if let nextEmit { | |
nextEmit.1.dispose() | |
observer.on(nextEmit.0) | |
observer.on(event) | |
} else { | |
observer.on(event) | |
} | |
} | |
} | |
} | |
} | |
} | |
private extension DispatchTimeInterval { | |
var asTimeInterval: TimeInterval { | |
switch self { | |
case .nanoseconds(let val): return Double(val) / 1_000_000_000.0 | |
case .microseconds(let val): return Double(val) / 1_000_000.0 | |
case .milliseconds(let val): return Double(val) / 1_000.0 | |
case .seconds(let val): return Double(val) | |
case .never: return Double.infinity | |
@unknown default: fatalError() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// ThrottleDebounceLatestTests.swift | |
// | |
// Created by Daniel Tartaglia on 14 Oct 2023. | |
// Copyright © 2023 Daniel Tartaglia. MIT License. | |
// | |
import RxTest | |
import XCTest | |
final class AccumulatingDebounceTests: XCTestCase { | |
func test() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let values: [Character: [String]] = [ | |
"1": ["A", "B"], | |
"2": ["C", "D"], | |
] | |
let source = scheduler.createObservable(timeline: "-A-B--C-D|") | |
let expected = parseEventsAndTimes(timeline: "-----1--2|", values: { values[$0]! }) | |
.offsetTime(by: 200) | |
let actual = scheduler.start { | |
source.accumulatingDebounce(.seconds(2), scheduler: scheduler) | |
} | |
XCTAssertEqual(actual.events, expected[0]) | |
} | |
func test1() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let values: [Character: [String]] = [ | |
"1": ["A"], | |
"2": ["B", "C"], | |
] | |
let source = scheduler.createObservable(timeline: "-A--B-C---|") | |
let expected = parseEventsAndTimes(timeline: "---1----2-|", values: { values[$0]! }) | |
.offsetTime(by: 200) | |
let actual = scheduler.start { | |
source.accumulatingDebounce(.seconds(2), scheduler: scheduler) | |
} | |
XCTAssertEqual(actual.events, expected[0]) | |
} | |
func test2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let values: [Character: [String]] = [ | |
"1": ["A", "B"], | |
] | |
let source = scheduler.createObservable(timeline: "-A-B|") | |
let expected = parseEventsAndTimes(timeline: "---1|", values: { values[$0]! }) | |
.offsetTime(by: 200) | |
let actual = scheduler.start { | |
source.accumulatingDebounce(.seconds(2), scheduler: scheduler) | |
} | |
XCTAssertEqual(actual.events, expected[0]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment