Skip to content

Instantly share code, notes, and snippets.

@Vladlex
Created December 5, 2017 21:24
Show Gist options
  • Save Vladlex/ea02dba926feeb369edd621be424dc52 to your computer and use it in GitHub Desktop.
Save Vladlex/ea02dba926feeb369edd621be424dc52 to your computer and use it in GitHub Desktop.
RxSwift Extension: Collect until no event comes in a given time interval
public extension Observable {
/// Returns an observable which collects incoming events and fires if no new event comes in a desired time
///
/// - Parameters:
/// - delay: Value for debouncing (interval that should being passed, a)
/// - bufferMutator: block to mutate collected events. If nil – then new evens just being added to buffer.
/// - Returns: an observable which collects incoming events and fires if no new event comes in a desired time.
public func collect(untilNoEventComesIn delay: RxTimeInterval, bufferMutator:((inout [E], E)->())? = nil) -> Observable<[E]> {
return Observable<[E]>.create({ observable in
var buffer: [E] = []
let lock = NSRecursiveLock()
let debouncer = PublishSubject<Void>.init()
let debouncerDisposable = debouncer.debounce(delay, scheduler: MainScheduler.asyncInstance).subscribe({ event in
lock.lock()
defer {
lock.unlock()
}
switch event {
case .next(_):
let collected = buffer
guard !collected.isEmpty else {
return
}
buffer.removeAll()
observable.onNext(collected)
case .completed:
observable.onCompleted()
case .error(let error):
observable.onError(error)
}
})
let disposable = self.subscribe { (event) in
lock.lock()
defer {
lock.unlock()
}
switch event {
case .next(let element):
if let mutator = bufferMutator {
mutator(&buffer, element)
}
else {
buffer.append(element)
}
debouncer.onNext()
case .completed:
debouncer.onCompleted()
case .error(let error):
debouncer.onError(error)
}
}
return Disposables.create([disposable, debouncerDisposable])
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment