Skip to content

Instantly share code, notes, and snippets.

@tilltue
Last active December 5, 2021 16:52
Show Gist options
  • Save tilltue/d4aceffc777935448686eac1b91a157e to your computer and use it in GitHub Desktop.
Save tilltue/d4aceffc777935448686eac1b91a157e to your computer and use it in GitHub Desktop.
public enum UploadStatus<Value> {
case progress(Progress)
case completion(Value)
case error(Error)
}
public protocol DataListner: AnyObject {
func remove()
}
public final class UploadDataStreamPublisher<Value>: Publisher {
public typealias Output = UploadStatus<Value>
public typealias Failure = Never
public typealias UploadHandler = (@escaping (Output) -> Void) -> DataListner?
private var uploadHandler: UploadHandler
public init(handler: @escaping UploadHandler) {
self.uploadHandler = handler
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = Inner(uploadHandler: uploadHandler, downstream: subscriber)
subscriber.receive(subscription: subscription)
}
private final class Inner<Downstream: Subscriber>: Subscription, Cancellable where Downstream.Input == Output {
typealias Failure = Downstream.Failure
private var downstream: Downstream?
private var listener: DataListner?
private let uploadHandler: UploadHandler
init(uploadHandler: @escaping UploadHandler, downstream: Downstream) {
self.uploadHandler = uploadHandler
self.downstream = downstream
}
func request(_ demand: Subscribers.Demand) {
assert(demand > 0)
guard let downstream = downstream else { return }
self.downstream = nil
listener = uploadHandler { uploadStatus in
_ = downstream.receive(uploadStatus)
if case .completion = uploadStatus {
downstream.receive(completion: .finished)
}
}
}
func cancel() {
listener?.remove()
downstream = nil
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment