Last active
June 20, 2019 17:54
-
-
Save felixvisee/87420c98ecfd95461e4c644500c124f1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true | |
import Combine | |
import Dispatch | |
/// Returns a subscriber that buffers up to `size` values before calling the given `handler`. The | |
/// subscriber will request more values from the upstream publisher once the `completion` closure | |
/// given to the `handler` is called. | |
func subscriber<Output, Failure>(size: Int, handler: @escaping (_ values: [Output], _ completion: @escaping () -> ()) -> ()) -> AnySubscriber<Output, Failure> { | |
var subscription: Subscription! | |
var buffer: [Output] = [] | |
func receiveSubscription(_ sub: Subscription) { | |
subscription = sub // Retain the subscription so that we can request more values at any time. | |
sub.request(.max(size)) | |
} | |
func receiveValue(_ value: Output) -> Subscribers.Demand { | |
buffer.append(value) | |
if buffer.count == size { | |
let values = buffer | |
buffer = [] // Reset the buffer in case the upstream publisher completes. | |
handler(values) { | |
subscription.request(.max(size)) // Can we do this on any thread? i.e., is the subscription thread-safe? | |
} | |
} | |
return .none | |
} | |
func receiveCompletion(_ completion: Subscribers.Completion<Failure>) { | |
if !buffer.isEmpty { | |
handler(buffer) { | |
// We no longer care about the handler calling our completion closure. | |
} | |
} | |
} | |
return AnySubscriber(receiveSubscription: receiveSubscription, receiveValue: receiveValue, receiveCompletion: receiveCompletion) | |
} | |
func handler<Output>(_ values: [Output], completion: @escaping () -> ()) { | |
print("handler received \(values)") | |
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(1)) { | |
print("handler completed") | |
completion() | |
} | |
} | |
(1...10) | |
.publisher() | |
.handleEvents(receiveOutput: { print("publisher sent \($0)") }) | |
.subscribe(subscriber(size: 2, handler: handler)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output: