Skip to content

Instantly share code, notes, and snippets.

@xavierLowmiller
Created April 19, 2020 06:22
Show Gist options
  • Save xavierLowmiller/e9cbd460a4f8ef4cf16cfa4e181c9351 to your computer and use it in GitHub Desktop.
Save xavierLowmiller/e9cbd460a4f8ef4cf16cfa4e181c9351 to your computer and use it in GitHub Desktop.
// Based on this StackOverflow answer: https://stackoverflow.com/a/61273595/4239752
import Foundation
import Combine
extension Publisher {
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
let subject = PassthroughSubject<[Output], Failure>()
var buffer: [Output] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
lock.lock(); defer { lock.unlock() }
subject.send(buffer)
buffer = []
})
let disposable = self.sink(
receiveCompletion: { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .finished:
subject.send(buffer)
subject.send(completion: .finished)
case .failure(let error):
subject.send(completion: .failure(error))
buffer = []
}
},
receiveValue: { element in
lock.lock(); defer { lock.unlock() }
buffer.append(element)
})
let completion = AnyCancellable {
boundaryDisposable.cancel()
disposable.cancel()
}
return subject
.handleEvents(receiveCancel: completion.cancel)
.eraseToAnyPublisher()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment