Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active March 10, 2023 20:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielt1263/1ab30c2ec6233c0967fab7b2811ae81a to your computer and use it in GitHub Desktop.
Save danielt1263/1ab30c2ec6233c0967fab7b2811ae81a to your computer and use it in GitHub Desktop.
Created by request. This operator works like the `debounce` operator except it emits all the elements that were created during the wait time.
//
// AccumulatingDebounce.swift
//
// Created by Daniel Tartaglia on 10 Mar 2023.
// Copyright © 2023 Daniel Tartaglia. MIT License.
//
import Foundation
import RxSwift
extension ObservableType {
/**
Accumulates elements from an observable sequence where are followed by another element within a specified relative time
duration, using the specified scheduler to run throttling timers.
- Parameters:
- dueTime: Throttling duration for each element.
- scheduler: Scheduler to run the throttle timers on.
- Returns: The throttled sequence.
*/
func accumulatingDebounce(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[Element]> {
.create { observer in
var state = [Element]()
var nextEmit = Disposable?.none
let lock = NSRecursiveLock()
let disposable = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let element):
state.append(element)
nextEmit?.dispose()
nextEmit = scheduler.scheduleRelative((), dueTime: dueTime) {
lock.lock(); defer { lock.unlock() }
observer.onNext(state)
state = []
return Disposables.create()
}
case .error(let error):
observer.onError(error)
case .completed:
if !state.isEmpty { observer.onNext(state) }
observer.onCompleted()
}
}
return Disposables.create(disposable, nextEmit ?? Disposables.create())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment