Last active
November 22, 2021 23:34
-
-
Save darrarski/1dd5588ea33d5456179e899c5be4b714 to your computer and use it in GitHub Desktop.
SimplePublisher - custom publisher for Combine framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
public final class SimplePublisher<Output, Failure>: Publisher where Failure: Error { | |
public init(_ closure: @escaping (Receiver<Output, Failure>) -> Disposable) { | |
self.closure = closure | |
} | |
public func receive<S>(subscriber: S) where S: Subscriber, S.Input == Output, S.Failure == Failure { | |
subscriber.receive(subscription: Subscription(subscriber: subscriber, closure: closure)) | |
} | |
private let closure: (Receiver<Output, Failure>) -> Disposable | |
public final class Receiver<Input, Failure> where Failure: Error { | |
public init<S>(_ subscriber: S) where S: Subscriber, S.Input == Input, S.Failure == Failure { | |
receiveInput = subscriber.receive(_:) | |
receiveCompletion = subscriber.receive(completion:) | |
} | |
@discardableResult | |
public func receive(_ input: Input) -> Subscribers.Demand { | |
receiveInput(input) | |
} | |
public func receive(completion: Subscribers.Completion<Failure>) { | |
receiveCompletion(completion) | |
} | |
private let receiveInput: (Input) -> Subscribers.Demand | |
private let receiveCompletion: (Subscribers.Completion<Failure>) -> Void | |
} | |
public final class Disposable { | |
public static func empty() -> Disposable { | |
Disposable {} | |
} | |
public init(_ onDeinit: @escaping () -> Void) { | |
self.onDeinit = onDeinit | |
} | |
deinit { onDeinit() } | |
private let onDeinit: () -> Void | |
} | |
final class Subscription<SubscriberType, Input, Failure>: Combine.Subscription | |
where SubscriberType: Subscriber, SubscriberType.Input == Input, SubscriberType.Failure == Failure { | |
init(subscriber: SubscriberType, closure: @escaping (Receiver<Input, Failure>) -> Disposable) { | |
self.subscriber = subscriber | |
self.closure = closure | |
} | |
func request(_ demand: Subscribers.Demand) { | |
guard demand > 0, let subscriber = subscriber else { return } | |
disposables.append(closure(Receiver(subscriber))) | |
} | |
func cancel() { | |
subscriber = nil | |
disposables.removeAll() | |
} | |
private var subscriber: SubscriberType? | |
private let closure: (Receiver<Input, Failure>) -> Disposable | |
private var disposables = [Disposable]() | |
} | |
} | |
public extension SimplePublisher.Receiver where Input == Void { | |
@discardableResult | |
func receive() -> Subscribers.Demand { | |
receive(()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import Foundation | |
func log(_ text: String) { | |
let thread = Thread.isMainThread ? "main thread" : "other thread" | |
print("[\(thread)] \(text)") | |
} | |
func createPublisher() -> AnyPublisher<String, Error> { | |
SimplePublisher { subscriber in | |
log("SUBSCRIBED") | |
DispatchQueue.global().asyncAfter(deadline: .now() + 1) { | |
subscriber.receive("Test A") | |
} | |
DispatchQueue.global().asyncAfter(deadline: .now() + 2) { | |
subscriber.receive("Test B") | |
} | |
DispatchQueue.global().asyncAfter(deadline: .now() + 3) { | |
subscriber.receive(completion: .finished) | |
} | |
return SimplePublisher.Disposable { | |
log("DISPOSED") | |
} | |
}.eraseToAnyPublisher() | |
} | |
let subscription = createPublisher() | |
.subscribe(on: DispatchQueue.global(qos: .background)) | |
.receive(on: DispatchQueue.main) | |
.sink(receiveCompletion: { completion in | |
switch completion { | |
case .failure(let error): | |
log("RECEIVED FAILURE: \(error)") | |
case .finished: | |
log("RECEIVED COMPLETION") | |
} | |
}, receiveValue: { value in | |
log("RECEIVED VALUE: \(value)") | |
}) | |
// Prints: | |
// ------- | |
// [other thread] SUBSCRIBED | |
// [main thread] RECEIVED VALUE: Test A | |
// [main thread] RECEIVED VALUE: Test B | |
// [other thread] DISPOSED | |
// [main thread] RECEIVED COMPLETION |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment