Skip to content

Instantly share code, notes, and snippets.

@felixvisee
Last active June 20, 2019 17:54
Show Gist options
  • Save felixvisee/87420c98ecfd95461e4c644500c124f1 to your computer and use it in GitHub Desktop.
Save felixvisee/87420c98ecfd95461e4c644500c124f1 to your computer and use it in GitHub Desktop.
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
import Combine
import Dispatch
/// Returns a subscriber that buffers up to `size` values before calling the given `handler`. The
/// subscriber will request more values from the upstream publisher once the `completion` closure
/// given to the `handler` is called.
func subscriber<Output, Failure>(size: Int, handler: @escaping (_ values: [Output], _ completion: @escaping () -> ()) -> ()) -> AnySubscriber<Output, Failure> {
var subscription: Subscription!
var buffer: [Output] = []
func receiveSubscription(_ sub: Subscription) {
subscription = sub // Retain the subscription so that we can request more values at any time.
sub.request(.max(size))
}
func receiveValue(_ value: Output) -> Subscribers.Demand {
buffer.append(value)
if buffer.count == size {
let values = buffer
buffer = [] // Reset the buffer in case the upstream publisher completes.
handler(values) {
subscription.request(.max(size)) // Can we do this on any thread? i.e., is the subscription thread-safe?
}
}
return .none
}
func receiveCompletion(_ completion: Subscribers.Completion<Failure>) {
if !buffer.isEmpty {
handler(buffer) {
// We no longer care about the handler calling our completion closure.
}
}
}
return AnySubscriber(receiveSubscription: receiveSubscription, receiveValue: receiveValue, receiveCompletion: receiveCompletion)
}
func handler<Output>(_ values: [Output], completion: @escaping () -> ()) {
print("handler received \(values)")
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(1)) {
print("handler completed")
completion()
}
}
(1...10)
.publisher()
.handleEvents(receiveOutput: { print("publisher sent \($0)") })
.subscribe(subscriber(size: 2, handler: handler))
@felixvisee
Copy link
Author

Output:

publisher sent 1
publisher sent 2
handler received [1, 2]
handler completed
publisher sent 3
publisher sent 4
handler received [3, 4]
handler completed
publisher sent 5
publisher sent 6
handler received [5, 6]
handler completed
publisher sent 7
publisher sent 8
handler received [7, 8]
handler completed
publisher sent 9
publisher sent 10
handler received [9, 10]
handler completed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment