Created
January 8, 2024 15:48
-
-
Save chorim/0da172a4600d19f19a7f09d37b3db9ea to your computer and use it in GitHub Desktop.
Publisher+RxThrottle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Publisher+RxThrottle.swift | |
// ThrottleDemo | |
// | |
// Created by Insu Byeon on 1/7/24. | |
// | |
import Foundation | |
#if canImport(Combine) | |
import Combine | |
extension Publisher { | |
func rx_throttle<S>( | |
for interval: S.SchedulerTimeType.Stride, | |
scheduler: S, | |
latest: Bool = true | |
) -> Publishers.RxThrottle<Self, S> where S: Scheduler { | |
return Publishers.RxThrottle( | |
upstream: self, | |
interval: interval, | |
scheduler: scheduler, | |
latest: latest | |
) | |
} | |
} | |
extension Publishers { | |
struct RxThrottle<Upstream, SchedulerType>: Publisher where Upstream: Publisher, SchedulerType: Scheduler { | |
typealias Output = Upstream.Output | |
typealias Failure = Upstream.Failure | |
let upstream: Upstream | |
let interval: SchedulerType.SchedulerTimeType.Stride | |
let scheduler: SchedulerType | |
let latest: Bool | |
init(upstream: Upstream, interval: SchedulerType.SchedulerTimeType.Stride, scheduler: SchedulerType, latest: Bool) { | |
self.upstream = upstream | |
self.interval = interval | |
self.scheduler = scheduler | |
self.latest = latest | |
} | |
func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input { | |
let subscription = RxThrottleSubscriber( | |
subscriber: subscriber, | |
interval: interval, | |
scheduler: scheduler, | |
latest: latest | |
) | |
upstream.subscribe(subscription) | |
} | |
} | |
final class RxThrottleSubscriber<Downstream: Subscriber, SchedulerType>: Subscriber where SchedulerType: Scheduler { | |
typealias Input = Downstream.Input | |
typealias Failure = Downstream.Failure | |
private var downstream: Downstream? = nil | |
private let interval: SchedulerType.SchedulerTimeType.Stride | |
private let scheduler: SchedulerType | |
private let latest: Bool | |
private var previousValue: Input? = nil | |
private var isRequesting: Bool = false | |
private var requestedDemand: Subscribers.Demand = .none | |
private var throttleTimer: Cancellable? = nil | |
init( | |
subscriber: Downstream, | |
interval: SchedulerType.SchedulerTimeType.Stride, | |
scheduler: SchedulerType, | |
latest: Bool | |
) { | |
self.downstream = subscriber | |
self.interval = interval | |
self.scheduler = scheduler | |
self.latest = latest | |
} | |
func receive(subscription: Subscription) { | |
subscription.request(.unlimited) | |
} | |
func receive(_ input: Downstream.Input) -> Subscribers.Demand { | |
previousValue = input | |
startThrottleTimer(input) | |
return .none | |
} | |
func receive(completion: Subscribers.Completion<Downstream.Failure>) { | |
throttleTimer?.cancel() | |
downstream?.receive(completion: completion) | |
} | |
func cancel() { | |
throttleTimer?.cancel() | |
throttleTimer = nil | |
downstream = nil | |
} | |
private func startThrottleTimer(_ input: Downstream.Input) { | |
guard !isRequesting else { return } | |
isRequesting = true | |
if latest { | |
if throttleTimer == nil { | |
_ = downstream?.receive(input) | |
throttleTimer = scheduler.schedule(after: scheduler.now.advanced(by: interval), interval: interval, tolerance: .zero, options: nil) { [weak self] in | |
if let previousValue = self?.previousValue { | |
_ = self?.downstream?.receive(previousValue) | |
self?.previousValue = nil | |
} else { | |
self?.throttleTimer?.cancel() | |
self?.throttleTimer = nil | |
} | |
self?.isRequesting = false | |
} | |
} | |
} else { | |
_ = downstream?.receive(input) | |
scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: nil) { [weak self] in | |
self?.isRequesting = false | |
} | |
} | |
} | |
} | |
} | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment