Skip to content

Instantly share code, notes, and snippets.

@gkaimakas
Created June 23, 2019 12:45
Show Gist options
  • Save gkaimakas/8d64f22f446e5c587fb1c61be5eb70cc to your computer and use it in GitHub Desktop.
Save gkaimakas/8d64f22f446e5c587fb1c61be5eb70cc to your computer and use it in GitHub Desktop.
import Combine
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct Materialize<Upstream>: Publisher where Upstream: Publisher {
public typealias Output = Event<Upstream.Output, Upstream.Failure>
public typealias Failure = Never
internal let cancellable = SerialCancellable()
internal let stream: PassthroughSubject<Output, Never>
internal let upstream: AnyPublisher<Upstream.Output, Upstream.Failure>
public init(_ upstream: Upstream) {
let _stream = PassthroughSubject<Output, Never>()
self.upstream = upstream
.handleEvents(receiveSubscription: { _stream.send(.subscription($0)) },
receiveOutput: { _stream.send(.output($0)) },
receiveCompletion: {
_stream.send(.completion($0))
_stream.send(completion: .finished) },
receiveCancel: {
_stream.send(.cancel)
_stream.send(completion: .finished) },
receiveRequest: { _stream.send(.request($0)) })
.eraseToAnyPublisher()
stream = _stream
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
stream.subscribe(subscriber)
cancellable.inner = upstream.sink(receiveValue: { _ in })
}
}
}
extension Publisher {
/// Treat all Events from `self` as plain values, allowing them to be
/// manipulated just like any other value.
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public func materialize() -> Publishers.Materialize<Self>{
return Publishers.Materialize(self)
}
}
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public enum Event<Output, Failure: Error> {
case subscription(Subscription)
case output(Output)
case completion(Subscribers.Completion<Failure>)
case cancel
case request(Subscribers.Demand)
}
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public class SerialCancellable: Cancellable {
private var _inner: Cancellable? = nil
public var inner: Cancellable? {
get {
_inner
}
set {
_inner?.cancel()
_inner = newValue
if isCancelled {
_inner?.cancel()
}
}
}
private var _isCancelled: Bool = false
public var isCancelled: Bool {
_isCancelled
}
public init() { }
public func cancel() {
_isCancelled = true
_inner?.cancel()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment