Created
June 26, 2019 07:32
-
-
Save gkaimakas/187ef127167f7a74d0a6d9d54094b82b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
public final class Signal<Output, Failure: Error>: Publisher { | |
let observer: Observer | |
public static func pipe() -> (Signal, Observer) { | |
var _observer: Observer? = nil | |
let signal = Signal { (observer, lifetime) in | |
_observer = observer | |
} | |
return (signal, _observer!) | |
} | |
public init(_ generator: @escaping (Observer, Lifetime) -> Void) { | |
let _observer = Observer({ (observer, event) in | |
observer.subscribers.modify({ (bag) in | |
bag.forEach { reference in | |
reference(observer, event) | |
} | |
}) | |
}) | |
observer = _observer | |
generator(observer, observer.lifetime.lifetime) | |
} | |
deinit { | |
observer.cancel() | |
} | |
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { | |
let reference: Observer.Send = { observer, event -> Void in | |
switch event { | |
case .cancel: | |
subscriber.receive(completion: .finished) | |
case let .completion(value): | |
subscriber.receive(completion: value) | |
case let .output(value): | |
let _ = subscriber.receive(value) | |
} | |
} | |
observer.addSubscriberReference(reference, id: subscriber.combineIdentifier) | |
subscriber.receive(subscription: observer) | |
} | |
} | |
extension Signal { | |
public final class Observer: Subscription { | |
public typealias Send = (Observer, Event) -> Void | |
let tokens: Atomic<[CombineIdentifier: Bag<Send>.Token]> = Atomic(initialValue: [:]) | |
let subscribers: Atomic<Bag<Send>> = Atomic(initialValue: Bag()) | |
private let _send: Send | |
private let cancelsOnDeinit: Bool | |
@Atomic private var isCompleted = false | |
internal let lifetime = Lifetime.make() | |
internal init(action: @escaping Send, cancelsOnDeinit: Bool) { | |
self._send = action | |
self.cancelsOnDeinit = cancelsOnDeinit | |
} | |
public init(_ action: @escaping Send) { | |
self._send = action | |
self.cancelsOnDeinit = false | |
} | |
public convenience init( | |
output: ((Output) -> Void)? = nil, | |
failure: ((Failure) -> Void)? = nil, | |
finished: (() -> Void)? = nil, | |
cancelled: (() -> Void)? = nil | |
) { | |
self.init { bag, event in | |
switch event { | |
case let .output(v): | |
output?(v) | |
case let .completion(completion): | |
switch completion { | |
case .finished: | |
finished?() | |
case .failure(let error): | |
failure?(error) | |
} | |
case .cancel: | |
cancelled?() | |
} | |
} | |
} | |
deinit { | |
cancel() | |
} | |
public func cancel() { | |
_send(self, .cancel) | |
isCompleted = true | |
subscribers.swap(Bag<Send>()) | |
} | |
public func request(_ demand: Subscribers.Demand) { | |
} | |
public func send(output: Output) { | |
if isCompleted == true { return } | |
_send(self, .output(output)) | |
} | |
public func send(completion: Subscribers.Completion<Failure>) { | |
if isCompleted == true { return } | |
isCompleted = true | |
_send(self, .completion(completion)) | |
} | |
func addSubscriberReference(_ ref: @escaping Send, id: CombineIdentifier) { | |
if isCompleted == true { | |
ref(self, .completion(.finished)) | |
return | |
} | |
subscribers.modify { [weak self] bag in | |
let token = bag.insert(ref) | |
self?.tokens.modify({ cache in | |
cache[id] = token | |
}) | |
} | |
} | |
} | |
} | |
extension Signal { | |
public enum Event { | |
case output(Output) | |
case completion(Subscribers.Completion<Failure>) | |
case cancel | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment