Skip to content

Instantly share code, notes, and snippets.

@gkaimakas
Created June 26, 2019 07:32
Show Gist options
  • Save gkaimakas/187ef127167f7a74d0a6d9d54094b82b to your computer and use it in GitHub Desktop.
Save gkaimakas/187ef127167f7a74d0a6d9d54094b82b to your computer and use it in GitHub Desktop.
import Combine
public final class Signal<Output, Failure: Error>: Publisher {
let observer: Observer
public static func pipe() -> (Signal, Observer) {
var _observer: Observer? = nil
let signal = Signal { (observer, lifetime) in
_observer = observer
}
return (signal, _observer!)
}
public init(_ generator: @escaping (Observer, Lifetime) -> Void) {
let _observer = Observer({ (observer, event) in
observer.subscribers.modify({ (bag) in
bag.forEach { reference in
reference(observer, event)
}
})
})
observer = _observer
generator(observer, observer.lifetime.lifetime)
}
deinit {
observer.cancel()
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let reference: Observer.Send = { observer, event -> Void in
switch event {
case .cancel:
subscriber.receive(completion: .finished)
case let .completion(value):
subscriber.receive(completion: value)
case let .output(value):
let _ = subscriber.receive(value)
}
}
observer.addSubscriberReference(reference, id: subscriber.combineIdentifier)
subscriber.receive(subscription: observer)
}
}
extension Signal {
public final class Observer: Subscription {
public typealias Send = (Observer, Event) -> Void
let tokens: Atomic<[CombineIdentifier: Bag<Send>.Token]> = Atomic(initialValue: [:])
let subscribers: Atomic<Bag<Send>> = Atomic(initialValue: Bag())
private let _send: Send
private let cancelsOnDeinit: Bool
@Atomic private var isCompleted = false
internal let lifetime = Lifetime.make()
internal init(action: @escaping Send, cancelsOnDeinit: Bool) {
self._send = action
self.cancelsOnDeinit = cancelsOnDeinit
}
public init(_ action: @escaping Send) {
self._send = action
self.cancelsOnDeinit = false
}
public convenience init(
output: ((Output) -> Void)? = nil,
failure: ((Failure) -> Void)? = nil,
finished: (() -> Void)? = nil,
cancelled: (() -> Void)? = nil
) {
self.init { bag, event in
switch event {
case let .output(v):
output?(v)
case let .completion(completion):
switch completion {
case .finished:
finished?()
case .failure(let error):
failure?(error)
}
case .cancel:
cancelled?()
}
}
}
deinit {
cancel()
}
public func cancel() {
_send(self, .cancel)
isCompleted = true
subscribers.swap(Bag<Send>())
}
public func request(_ demand: Subscribers.Demand) {
}
public func send(output: Output) {
if isCompleted == true { return }
_send(self, .output(output))
}
public func send(completion: Subscribers.Completion<Failure>) {
if isCompleted == true { return }
isCompleted = true
_send(self, .completion(completion))
}
func addSubscriberReference(_ ref: @escaping Send, id: CombineIdentifier) {
if isCompleted == true {
ref(self, .completion(.finished))
return
}
subscribers.modify { [weak self] bag in
let token = bag.insert(ref)
self?.tokens.modify({ cache in
cache[id] = token
})
}
}
}
}
extension Signal {
public enum Event {
case output(Output)
case completion(Subscribers.Completion<Failure>)
case cancel
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment