Created
June 22, 2019 22:23
-
-
Save gkaimakas/3b1d354fbdaeb5298f0d9474d2a68aa5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
class Lifetime { | |
static func +(lhs: Lifetime, rhs: @escaping () -> Void) -> () -> Void { | |
let result: ()->Void = { | |
lhs.end() | |
rhs() | |
} | |
return result | |
} | |
var ended: (() -> Void) = {} | |
var cancellable: Cancellable? | |
var isEnded: Bool = false | |
init(cancellable: Cancellable? = nil) { | |
self.cancellable = cancellable | |
} | |
func end() { | |
ended() | |
isEnded = true | |
cancellable?.cancel() | |
cancellable = nil | |
} | |
} | |
class Observer<Input, Failure> where Failure: Error { | |
var value: ((Input) -> Void)? | |
var completion: ((Subscribers.Completion<Failure>) -> Void)? | |
let lifetime: Lifetime | |
init(lifetime: Lifetime, | |
value: @escaping (Input) -> Void, | |
completion: @escaping (Subscribers.Completion<Failure>) -> Void) { | |
self.lifetime = lifetime | |
self.value = value | |
self.completion = completion | |
lifetime.ended = lifetime + { self.sendCompleted() } | |
} | |
func send(value input: Input) { | |
if lifetime.isEnded { return } | |
value?(input) | |
} | |
func send(failure: Failure) { | |
if lifetime.isEnded { return } | |
completion?(.failure(failure)) | |
completion = nil | |
value = nil | |
} | |
func sendCompleted() { | |
completion?(.finished) | |
value = nil | |
completion = nil | |
} | |
} | |
class WorkSubscription<S>: Subscription where S: Subscriber { | |
var subscriber: S? | |
var work: ((S, Lifetime) -> Void)? | |
init(subscriber: S, work: @escaping (S, Lifetime) -> Void) { | |
self.subscriber = subscriber | |
self.work = work | |
} | |
func request(_ demand: Subscribers.Demand) { | |
guard let subscriber = subscriber, | |
let work = work else { | |
return | |
} | |
work(subscriber, Lifetime(cancellable: self)) | |
} | |
func cancel() { | |
work = nil | |
subscriber = nil | |
} | |
} | |
class WorkPublisher<Output, Failure: Error>: Publisher { | |
let work: (Observer<Output, Failure>, Lifetime) -> Void | |
init(_ work: @escaping (Observer<Output, Failure>, Lifetime) -> Void) { | |
self.work = work | |
} | |
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = WorkSubscription(subscriber: subscriber) { [weak self] subscriber, lifetime in | |
let observer = Observer<S.Input, S.Failure>(lifetime: lifetime, | |
value: { let _ = subscriber.receive($0) }, | |
completion: { subscriber.receive(completion: $0) }) | |
self?.work(observer, lifetime) | |
} | |
subscriber.receive(subscription: subscription) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment