Skip to content

Instantly share code, notes, and snippets.

@chorim
Created January 8, 2024 15:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chorim/0da172a4600d19f19a7f09d37b3db9ea to your computer and use it in GitHub Desktop.
Save chorim/0da172a4600d19f19a7f09d37b3db9ea to your computer and use it in GitHub Desktop.
Publisher+RxThrottle
//
// Publisher+RxThrottle.swift
// ThrottleDemo
//
// Created by Insu Byeon on 1/7/24.
//
import Foundation
#if canImport(Combine)
import Combine
extension Publisher {
func rx_throttle<S>(
for interval: S.SchedulerTimeType.Stride,
scheduler: S,
latest: Bool = true
) -> Publishers.RxThrottle<Self, S> where S: Scheduler {
return Publishers.RxThrottle(
upstream: self,
interval: interval,
scheduler: scheduler,
latest: latest
)
}
}
extension Publishers {
struct RxThrottle<Upstream, SchedulerType>: Publisher where Upstream: Publisher, SchedulerType: Scheduler {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
let upstream: Upstream
let interval: SchedulerType.SchedulerTimeType.Stride
let scheduler: SchedulerType
let latest: Bool
init(upstream: Upstream, interval: SchedulerType.SchedulerTimeType.Stride, scheduler: SchedulerType, latest: Bool) {
self.upstream = upstream
self.interval = interval
self.scheduler = scheduler
self.latest = latest
}
func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input {
let subscription = RxThrottleSubscriber(
subscriber: subscriber,
interval: interval,
scheduler: scheduler,
latest: latest
)
upstream.subscribe(subscription)
}
}
final class RxThrottleSubscriber<Downstream: Subscriber, SchedulerType>: Subscriber where SchedulerType: Scheduler {
typealias Input = Downstream.Input
typealias Failure = Downstream.Failure
private var downstream: Downstream? = nil
private let interval: SchedulerType.SchedulerTimeType.Stride
private let scheduler: SchedulerType
private let latest: Bool
private var previousValue: Input? = nil
private var isRequesting: Bool = false
private var requestedDemand: Subscribers.Demand = .none
private var throttleTimer: Cancellable? = nil
init(
subscriber: Downstream,
interval: SchedulerType.SchedulerTimeType.Stride,
scheduler: SchedulerType,
latest: Bool
) {
self.downstream = subscriber
self.interval = interval
self.scheduler = scheduler
self.latest = latest
}
func receive(subscription: Subscription) {
subscription.request(.unlimited)
}
func receive(_ input: Downstream.Input) -> Subscribers.Demand {
previousValue = input
startThrottleTimer(input)
return .none
}
func receive(completion: Subscribers.Completion<Downstream.Failure>) {
throttleTimer?.cancel()
downstream?.receive(completion: completion)
}
func cancel() {
throttleTimer?.cancel()
throttleTimer = nil
downstream = nil
}
private func startThrottleTimer(_ input: Downstream.Input) {
guard !isRequesting else { return }
isRequesting = true
if latest {
if throttleTimer == nil {
_ = downstream?.receive(input)
throttleTimer = scheduler.schedule(after: scheduler.now.advanced(by: interval), interval: interval, tolerance: .zero, options: nil) { [weak self] in
if let previousValue = self?.previousValue {
_ = self?.downstream?.receive(previousValue)
self?.previousValue = nil
} else {
self?.throttleTimer?.cancel()
self?.throttleTimer = nil
}
self?.isRequesting = false
}
}
} else {
_ = downstream?.receive(input)
scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: nil) { [weak self] in
self?.isRequesting = false
}
}
}
}
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment