Skip to content

Instantly share code, notes, and snippets.

@toshi0383
Created October 6, 2018 02:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toshi0383/bf567dfa62389f8356b13844fa15b2ed to your computer and use it in GitHub Desktop.
Save toshi0383/bf567dfa62389f8356b13844fa15b2ed to your computer and use it in GitHub Desktop.
import Foundation
import RxSwift
extension ObservableType {
/**
Returns an Observable that emits the first and the latest item emitted by the source Observable during sequential time windows of a specified duration.
This operator makes sure that no two elements are emitted in less then each consulted dueTime.
- seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html)
- parameter dueTime: Throttling duration for each element. Consulted after each next event for next throttle.
- parameter until: dueTime interval resets to 0 at each next event of this stream.
- parameter latest: Should latest element received in a dueTime wide time window since last element emission be emitted.
- parameter scheduler: Scheduler to run the throttle timers on.
- returns: The throttled sequence.
*/
public func throttle<O: ObservableType>(dueTime: @escaping (E, RxTimeInterval) -> RxTimeInterval, until: O, latest: Bool = false, scheduler: SchedulerType)
-> Observable<E> {
return Throttle2(source: self.asObservable(),
dueTime: dueTime,
until: until.asObservable(),
latest: latest,
scheduler: scheduler)
}
}
final fileprivate class ThrottleSink2<O: ObserverType, UntilElement>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Element = O.E
typealias ParentType = Throttle2<Element, UntilElement>
private let _parent: ParentType
let _lock = RecursiveLock()
// state
private var _lastUnsentElement: Element? = nil
private var _lastSentTime: Date? = nil
private var _completed: Bool = false
private var _currentDueTime: RxTimeInterval = 0
let cancellable = SerialDisposable()
init(parent: ParentType, observer: O, cancel: Cancelable) {
_parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
let untilSubscription = _parent._until.subscribe(onNext: { [weak self] _ in
self?.cancellable.disposable.dispose()
self?._currentDueTime = 0
self?._lastSentTime = nil
})
let subscription = _parent._source.subscribe(self)
return Disposables.create(subscription, untilSubscription, cancellable)
}
func on(_ event: Event<Element>) {
synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next(let element):
let now = _parent._scheduler.now
let timeIntervalSinceLast: RxTimeInterval
if let lastSendingTime = _lastSentTime {
timeIntervalSinceLast = now.timeIntervalSince(lastSendingTime)
}
else {
timeIntervalSinceLast = _currentDueTime
}
let couldSendNow = timeIntervalSinceLast >= _currentDueTime
if couldSendNow {
_currentDueTime = _parent._dueTime(element, _currentDueTime)
self.sendNow(element: element)
return
}
if !_parent._latest {
return
}
let isThereAlreadyInFlightRequest = _lastUnsentElement != nil
_lastUnsentElement = element
if isThereAlreadyInFlightRequest {
return
}
let scheduler = _parent._scheduler
let d = SingleAssignmentDisposable()
self.cancellable.disposable = d
d.setDisposable(scheduler.scheduleRelative(0, dueTime: _currentDueTime - timeIntervalSinceLast, action: self.propagate))
case .error:
_lastUnsentElement = nil
forwardOn(event)
dispose()
case .completed:
if let _ = _lastUnsentElement {
_completed = true
}
else {
forwardOn(.completed)
dispose()
}
}
}
private func sendNow(element: Element) {
_lastUnsentElement = nil
self.forwardOn(.next(element))
// in case element processing takes a while, this should give some more room
_lastSentTime = _parent._scheduler.now
}
func propagate(_: Int) -> Disposable {
_lock.lock(); defer { _lock.unlock() } // {
if let lastUnsentElement = _lastUnsentElement {
sendNow(element: lastUnsentElement)
}
if _completed {
forwardOn(.completed)
dispose()
}
// }
return Disposables.create()
}
}
final fileprivate class Throttle2<Element, UntilElement> : Producer<Element> {
fileprivate let _source: Observable<Element>
fileprivate let _dueTime: (Element, RxTimeInterval) -> RxTimeInterval
fileprivate let _until: Observable<UntilElement>
fileprivate let _latest: Bool
fileprivate let _scheduler: SchedulerType
init(source: Observable<Element>, dueTime: @escaping (Element, RxTimeInterval) -> RxTimeInterval, until: Observable<UntilElement>, latest: Bool, scheduler: SchedulerType) {
_source = source
_dueTime = dueTime
_until = until
_latest = latest
_scheduler = scheduler
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = ThrottleSink2(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment