Skip to content

Instantly share code, notes, and snippets.

@onevcat
Created December 13, 2019 08:15
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save onevcat/baecc584e3cbfa2cc161290b2dfd300a to your computer and use it in GitHub Desktop.
Save onevcat/baecc584e3cbfa2cc161290b2dfd300a to your computer and use it in GitHub Desktop.
import Foundation
import Combine
protocol Resumable {
func resume()
}
extension Subscribers {
class ResumableSink<Input, Failure: Error>: Subscriber, Cancellable, Resumable {
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
let receiveValue: (Input) -> Bool
var shouldPullNewValue: Bool = false
var subscription: Subscription?
init(
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Input) -> Bool
)
{
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
func receive(subscription: Subscription) {
self.subscription = subscription
resume()
}
func receive(_ input: Input) -> Subscribers.Demand {
shouldPullNewValue = receiveValue(input)
return shouldPullNewValue ? .max(1) : .none
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
subscription = nil
}
func cancel() {
subscription?.cancel()
subscription = nil
}
func resume() {
guard !shouldPullNewValue else {
return
}
shouldPullNewValue = true
subscription?.request(.max(1))
}
}
}
extension Publisher {
func resumableSink(
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Bool
) -> Cancellable & Resumable
{
let sink = Subscribers.ResumableSink<Output, Failure>(
receiveCompletion: receiveCompletion,
receiveValue: receiveValue
)
self.subscribe(sink)
return sink
}
}
var buffer = [Int]()
let subscriber = (1...).publisher.print().resumableSink(
receiveCompletion: { completion in
print("Completion: \(completion)")
},
receiveValue: { value in
print("Receive value: \(value)")
buffer.append(value)
return buffer.count < 5
}
)
let cancellable = Timer.publish(every: 1, on: .main, in: .default)
.autoconnect()
.sink { _ in
buffer.removeAll()
subscriber.resume()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment