Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Unordered batch subscribe.
extension Sequence where Element: Publisher {
func bufferedSubscribe(by size: Int) -> AnyPublisher<[Element.Output], Element.Failure> {
publisher
.buffer(size: .max, prefetch: .byRequest, whenFull: .dropOldest) // Since we’re
// buffering up to `Int.max` elements _by request_ — to avoid flooding the buffer — ,
// `BufferingStrategy.dropOldest` or `.dropNewest` likely work equally well here.
.flatMap(maxPublishers: .max(size)) { $0 }
.collect(size)
.eraseToAnyPublisher()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment