Skip to content

Instantly share code, notes, and snippets.

@gkaimakas
Created June 20, 2019 07:34
Show Gist options
  • Save gkaimakas/b5b1ad8ac08c6aeacbc787f549417bf6 to your computer and use it in GitHub Desktop.
Save gkaimakas/b5b1ad8ac08c6aeacbc787f549417bf6 to your computer and use it in GitHub Desktop.
import Combine
extension Subscribers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public final class SinkEvents<Upstream>: Subscriber where Upstream: Publisher {
public typealias Input = Upstream.Output
public typealias Failure = Upstream.Failure
let lifetime: Cancellable
internal var subscription: Subscription?
internal let events = PassthroughSubject<SinkEvent<Upstream.Output, Upstream.Failure>, Never>()
public init(receiveEvent: @escaping (SinkEvent<Upstream.Output, Upstream.Failure>) -> Void) {
lifetime = events.sink { receiveEvent($0) }
}
deinit {
lifetime.cancel()
}
public func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited)
}
public func receive(_ input: Upstream.Output) -> Subscribers.Demand {
events.send(.input(input))
return .unlimited
}
public func receive(completion: Subscribers.Completion<Upstream.Failure>) {
events.send(.completion(completion))
subscription?.cancel()
}
}
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public enum SinkEvent<Input, Failure: Error> {
case input(Input)
case completion(Subscribers.Completion<Failure>)
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
public func sinkEvents(_ receiveEvent: @escaping (SinkEvent<Self.Output, Self.Failure>)-> Void) -> Subscribers.SinkEvents<Self> {
let subscriber = Subscribers.SinkEvents<Self>(receiveEvent: receiveEvent)
self.subscribe(subscriber)
return subscriber
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment