Skip to content

Instantly share code, notes, and snippets.

@ipavlidakis
Created December 1, 2019 19:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ipavlidakis/8c412bd788f5c93bcb72f00ce0398c1a to your computer and use it in GitHub Desktop.
Save ipavlidakis/8c412bd788f5c93bcb72f00ce0398c1a to your computer and use it in GitHub Desktop.
Universal Combine
import Foundation
//
// UniversalCombine.swift
// UniversalCombine
//
// Created by Ilias Pavlidakis on 30/11/2019.
//
import Foundation
class UniversalCombine {}
protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func subscribe<S: Subscriber>(_ subscriber: S) where Self.Failure == S.Failure, Self.Output == S.Input
func subscribe<S: Subscriber>(subscriber: S) -> Cancellable? where Self.Failure == S.Failure, Self.Output == S.Input
func eraseToAnyPublisher() -> AnyPublisher<Output, Failure>
}
extension Publisher {
func eraseToAnyPublisher() -> AnyPublisher<Output, Failure> { AnyPublisher<Output, Failure>(publisher: self) }
}
protocol Subscriber: class {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Input)
func receive(completion: Subscribers.Completion<Failure>)
func eraseToAnySubscriber() -> AnySubscriber<Input, Failure>
}
extension Subscriber {
func eraseToAnySubscriber() -> AnySubscriber<Input, Failure> { AnySubscriber<Input, Failure>(subscriber: self) }
}
protocol Cancellable {
func cancel()
}
protocol Subscription {
func cancel()
}
protocol Subject: Publisher, Subscriber {
associatedtype Upstream: Publisher
associatedtype Downstream: Subscriber
func send(_ input: Upstream.Output)
}
// CONCRETE
enum Subscribers {
enum Completion<Failure> where Failure : Error {
case failed(_ error: Failure)
case completed
}
}
extension Publisher {
func map<T>(
_ transform: @escaping (Self.Output) -> T) -> AnyPublisher<T, Self.Failure> {
Publishers.Map<Self, _Subscriber<T, Self.Failure>>(upstream: self.eraseToAnyPublisher()) { transform($0) }.eraseToAnyPublisher()
}
func compactMap<T>(
_ transform: @escaping (Self.Output) -> Optional<T>) -> AnyPublisher<Optional<T>, Self.Failure> {
Publishers.Map<Self, _Subscriber<Optional<T>, Self.Failure>>(upstream: self.eraseToAnyPublisher()) { transform($0) }.eraseToAnyPublisher()
}
func print() -> AnyPublisher<Self.Output, Self.Failure> {
Publishers.Print<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher()).eraseToAnyPublisher()
}
func debugPrint() -> AnyPublisher<Self.Output, Self.Failure> {
Publishers.DebugPrint<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher()).eraseToAnyPublisher()
}
func dispatch(on dispatchQueue: DispatchQueue) -> AnyPublisher<Self.Output, Self.Failure> {
Publishers.DispatchOn<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher(), dispatchQueue: dispatchQueue).eraseToAnyPublisher()
}
func threadInfo() -> AnyPublisher<Self.Output, Self.Failure> {
Publishers.ThreadReport<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher()).eraseToAnyPublisher()
}
}
enum Publishers {
final class Map<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure {}
final class CompactMap<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure {}
final class Print<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input {
private var transform: (Downstream.Input) -> Void = { Swift.print($0) }
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>, _ transform: @escaping (Downstream.Input) -> Void = { Swift.print($0) }) {
self.transform = transform
super.init(upstream: upstream) { $0 }
}
override func receive(_ input: Input) {
transform(input)
super.receive(input)
}
}
final class DebugPrint<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input {
private var transform: (Downstream.Input) -> Void = { Swift.debugPrint($0) }
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>, _ transform: @escaping (Downstream.Input) -> Void = { Swift.debugPrint($0) }) {
self.transform = transform
super.init(upstream: upstream) { $0 }
}
override func receive(_ input: Input) {
transform(input)
super.receive(input)
}
}
final class DispatchOn<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input {
private let dispatchQueue: DispatchQueue
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>, dispatchQueue: DispatchQueue) {
self.dispatchQueue = dispatchQueue
super.init(upstream: upstream) { $0 }
}
override func receive(_ input: Input) { dispatchQueue.async { super.receive(input) } }
}
final class ThreadReport<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input {
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>) {
super.init(upstream: upstream) { $0 }
}
override func receive(_ input: Input) {
Swift.print("Is this thread, the main thread: \(Thread.current.isMainThread)")
super.receive(input)
}
}
}
final class AnySubscription<Input, Output, Failure: Error>: Subscription, Cancellable {
private let cancellable: Cancellable
private var publisher: AnyPublisher<Output, Failure>?
private var subscriber: AnySubscriber<Input, Failure>?
init(
cancellable: Cancellable,
publisher: AnyPublisher<Output, Failure>,
subscriber: AnySubscriber<Input, Failure>) {
self.cancellable = cancellable
self.publisher = publisher
self.subscriber = subscriber
}
func cancel() {
cancellable.cancel()
publisher = nil
subscriber = nil
}
}
final class AnyCancellable: Cancellable {
private let cancellationClosure: () -> Void
init(cancellationClosure: @escaping () -> Void) { self.cancellationClosure = cancellationClosure }
func cancel() { cancellationClosure() }
}
final class AnyPublisher<Output, Failure: Error>: Publisher, Equatable, Hashable {
private let identifier: AnyHashable
private let _publisher: Any
private let _subscribeWithoutCancellable: (AnySubscriber<Output, Failure>) -> Void
private let _subscribeWithCancellable: (AnySubscriber<Output, Failure>) -> Cancellable?
static func == (lhs: AnyPublisher<Output, Failure>, rhs: AnyPublisher<Output, Failure>) -> Bool { lhs.identifier == rhs.identifier }
class func create() -> AnyPublisher<Output, Failure> {
AnyPublisher<Output, Failure>(publisher: _Publisher<Output, Failure>())
}
init<P: Publisher>(identifier: AnyHashable = UUID(), publisher: P) where P.Output == Output, P.Failure == Failure {
self.identifier = identifier
self._publisher = publisher
self._subscribeWithoutCancellable = { publisher.subscribe($0) }
self._subscribeWithCancellable = { publisher.subscribe(subscriber: $0) }
}
func hash(into hasher: inout Hasher) { hasher.combine(identifier) }
func subscribe<S>(_ subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { _subscribeWithoutCancellable(subscriber.eraseToAnySubscriber()) }
func subscribe<S>(subscriber: S) -> Cancellable? where S : Subscriber, Failure == S.Failure, Output == S.Input { _subscribeWithCancellable(subscriber.eraseToAnySubscriber()) }
}
final class AnySubscriber<Input, Failure: Error>: Subscriber, Equatable, Hashable {
private let identifier: AnyHashable
private let _subscriber: Any
private let _receiveSubscription: (Subscription) -> ()
private let _receiveInput: (Input) -> ()
private let _receiveCompletion: (Subscribers.Completion<Failure>) -> Void
static func == (lhs: AnySubscriber<Input, Failure>, rhs: AnySubscriber<Input, Failure>) -> Bool { lhs.identifier == rhs.identifier }
static func create<Input, Failure: Error>() -> AnySubscriber<Input, Failure> {
AnySubscriber<Input, Failure>(subscriber: _Subscriber<Input, Failure>())
}
init<S: Subscriber>(identifier: AnyHashable = UUID(), subscriber: S) where S.Input == Input, S.Failure == Failure {
self.identifier = identifier
self._subscriber = subscriber
self._receiveSubscription = { [weak subscriber] in if let subscriber = subscriber { subscriber.receive(subscription: $0) } }
self._receiveInput = { [weak subscriber] in if let subscriber = subscriber { subscriber.receive($0) } }
self._receiveCompletion = { [weak subscriber] in if let subscriber = subscriber { subscriber.receive(completion: $0) } }
}
func hash(into hasher: inout Hasher) { hasher.combine(identifier) }
func receive(subscription: Subscription) { _receiveSubscription(subscription) }
func receive(_ input: Input) { _receiveInput(input) }
func receive(completion: Subscribers.Completion<Failure>) { _receiveCompletion(completion) }
}
final class AnySubject<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream>, Equatable, Hashable where Downstream.Failure == Upstream.Failure {
private let identifier: AnyHashable
private let _subject: Any
private let _subscribeWithoutCancellable: (AnySubscriber<Downstream.Input, Downstream.Failure>) -> Void
private let _subscribeWithCancellable: (AnySubscriber<Downstream.Input, Downstream.Failure>) -> Cancellable?
private let _receiveSubscription: (Subscription) -> ()
private let _receiveInput: (Upstream.Output) -> ()
private let _receiveCompletion: (Subscribers.Completion<Failure>) -> Void
static func == (lhs: AnySubject<Upstream, Downstream>, rhs: AnySubject<Upstream, Downstream>) -> Bool { lhs.identifier == rhs.identifier }
class func create(
upstream: AnyPublisher<Upstream.Output, Upstream.Failure>? = nil,
transform: @escaping (Upstream.Output) -> Downstream.Input
) -> AnySubject<Upstream, Downstream> {
AnySubject<Upstream, Downstream>(subject: _Subject<Upstream, Downstream>(upstream: upstream, transform))
}
init<S: Subject>(
identifier: AnyHashable = UUID(),
subject: S
) where S.Failure == Upstream.Failure, S.Output == Downstream.Input, S.Input == Upstream.Output {
self.identifier = identifier
self._subject = subject
self._subscribeWithoutCancellable = { subject.subscribe($0) }
self._subscribeWithCancellable = { subject.subscribe(subscriber: $0) }
self._receiveSubscription = { [weak subject] in if let subject = subject { subject.receive(subscription: $0) } }
self._receiveInput = { [weak subject] in if let subject = subject { subject.receive($0) } }
self._receiveCompletion = { [weak subject] in if let subject = subject { subject.receive(completion: $0) } }
super.init { _ in fatalError("Transform call shouldn't be forwarded") }
}
func hash(into hasher: inout Hasher) { hasher.combine(identifier) }
override func subscribe<S: Subscriber>(_ subscriber: S) where Failure == S.Failure, Output == S.Input { _subscribeWithoutCancellable(subscriber.eraseToAnySubscriber()) }
override func subscribe<S>(subscriber: S) -> Cancellable? where S : Subscriber, Failure == S.Failure, Output == S.Input { _subscribeWithCancellable(subscriber.eraseToAnySubscriber()) }
override func receive(subscription: Subscription) { _receiveSubscription(subscription) }
override func receive(_ input: Input) { _receiveInput(input) }
override func receive(completion: Subscribers.Completion<Failure>) { _receiveCompletion(completion) }
}
class _Publisher<Output, Failure: Error>: Publisher {
private var subscribers: [AnySubscriber<Output, Failure>] = []
private func cancellable(for anySubscriber: AnySubscriber<Output, Failure>) -> Cancellable {
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } }
}
func subscribe<S: Subscriber>(_ subscriber: S) where Failure == S.Failure, Output == S.Input {
let anySubscriber = subscriber.eraseToAnySubscriber()
subscribers.append(anySubscriber)
subscriber.receive(
subscription: AnySubscription(
cancellable: cancellable(for: anySubscriber),
publisher: eraseToAnyPublisher(),
subscriber: anySubscriber))
}
func subscribe<S: Subscriber>(subscriber: S) -> Cancellable? where Failure == S.Failure, Output == S.Input {
let anySubscriber = subscriber.eraseToAnySubscriber()
subscribers.append(anySubscriber)
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } }
}
}
class _Subscriber<Input, Failure: Error>: Subscriber {
private var subscription: Subscription?
func receive(subscription: Subscription) { self.subscription = subscription }
func receive(_ input: Input) {}
func receive(completion: Subscribers.Completion<Failure>) { subscription?.cancel() }
}
class _Subject<Upstream: Publisher, Downstream: Subscriber>: _Subscriber<Upstream.Output, Upstream.Failure>, Subject where Downstream.Failure == Upstream.Failure {
typealias Output = Downstream.Input
typealias Failure = Upstream.Failure
typealias Input = Upstream.Output
private let _transform: (Upstream.Output) -> Downstream.Input
private var subscribers: [AnySubscriber<Output, Failure>] = []
private var subscription: Subscription?
convenience init<Down: Subscriber>(_ up: AnyPublisher<Upstream.Output, Upstream.Failure>, _ down: Down) where Down.Input == Downstream.Input, Upstream.Output == Down.Input {
self.init(upstream: up) { $0 }
}
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>? = nil, _ transform: @escaping (Upstream.Output) -> Downstream.Input) {
self._transform = transform
super.init()
upstream?.subscribe(self)
}
private func cancellable(for anySubscriber: AnySubscriber<Output, Failure>) -> Cancellable {
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } }
}
// Publisher
func subscribe<S: Subscriber>(_ subscriber: S) where Failure == S.Failure, Output == S.Input {
let anySubscriber = subscriber.eraseToAnySubscriber()
subscribers.append(anySubscriber)
subscriber.receive(
subscription: AnySubscription(
cancellable: cancellable(for: anySubscriber),
publisher: self.eraseToAnyPublisher(),
subscriber: anySubscriber))
}
func subscribe<S: Subscriber>(subscriber: S) -> Cancellable? where Failure == S.Failure, Output == S.Input {
let anySubscriber = subscriber.eraseToAnySubscriber()
subscribers.append(anySubscriber)
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } }
}
// Subscriber
override func receive(_ input: Input) {
let transformedValue = _transform(input)
subscribers.forEach { $0.receive(transformedValue) }
}
// Subject
func send(_ input: Upstream.Output) {
receive(input)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment