Skip to content

Instantly share code, notes, and snippets.

@dangthaison91
Forked from danielt1263/Buffers.swift
Created May 28, 2017 20:30
Show Gist options
  • Save dangthaison91/4ad50870600b8c9705406ec495122218 to your computer and use it in GitHub Desktop.
Save dangthaison91/4ad50870600b8c9705406ec495122218 to your computer and use it in GitHub Desktop.
// ObservableBuffer.swift
//
// Created by Daniel Tartaglia
// Copyright © 2017 Daniel Tartaglia. MIT License.
extension Observable {
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<U>(_ boundary: Observable<U>) -> Observable<[E]> {
return Observable<[E]>.create { observer in
var buffer: [E] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next:
observer.onNext(buffer)
buffer = []
default:
break
}
}
let disposable = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let element):
buffer.append(element)
case .completed:
observer.onNext(buffer)
observer.onCompleted()
case .error(let error):
observer.onError(error)
buffer = []
}
}
return Disposables.create([disposable, boundaryDisposable])
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment