Skip to content

Instantly share code, notes, and snippets.

@darrarski
Last active February 15, 2020 08:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darrarski/457e675e78f515e25e204b8f54bc3a79 to your computer and use it in GitHub Desktop.
Save darrarski/457e675e78f515e25e204b8f54bc3a79 to your computer and use it in GitHub Desktop.
CustomPublisher - customizable publisher for Combine framework
import Combine
public final class CustomPublisher<Output, Failure>: Publisher where Failure: Error {
public init(subscribe subscribeClosure: @escaping (AnySubscriber<Output, Failure>) -> Subscription) {
self.subscribeClosure = subscribeClosure
}
public func receive<S>(subscriber: S) where S: Combine.Subscriber, S.Input == Output, S.Failure == Failure {
let subscription = subscribeClosure(AnySubscriber(subscriber))
subscriber.receive(subscription: subscription)
}
private let subscribeClosure: (AnySubscriber<Output, Failure>) -> Subscription
}
public extension CustomPublisher {
convenience init(
request requestClosure: @escaping (AnySubscriber<Output, Failure>, Subscribers.Demand) -> Void,
cancel cancelClosure: @escaping () -> Void = {},
deinit deinitClosure: @escaping () -> Void = {}
) {
self.init { subscriber in
CustomSubscription(
subscriber,
request: requestClosure,
cancel: cancelClosure,
deinit: deinitClosure
)
}
}
}
public final class CustomSubscription<Output, Failure>: Subscription where Failure: Error {
public init(
_ subscriber: AnySubscriber<Output, Failure>,
request requestClosure: @escaping (AnySubscriber<Output, Failure>, Subscribers.Demand) -> Void,
cancel cancelClosure: @escaping () -> Void = {},
deinit deinitClosure: @escaping () -> Void = {}
) {
self.subscriber = subscriber
self.requestClosure = requestClosure
self.cancelClosure = cancelClosure
self.deinitClosure = deinitClosure
}
deinit {
deinitClosure()
}
public func request(_ demand: Subscribers.Demand) {
if let subscriber = subscriber {
requestClosure(subscriber, demand)
}
}
public func cancel() {
subscriber = nil
cancelClosure()
}
private var subscriber: AnySubscriber<Output, Failure>?
private let requestClosure: (AnySubscriber<Output, Failure>, Subscribers.Demand) -> Void
private let cancelClosure: () -> Void
private let deinitClosure: () -> Void
}
import Combine
import Foundation
func log(_ text: String) {
let thread = Thread.isMainThread ? "[main thread] " : "[other thread]"
print("\(thread) \(text)")
}
func createPublisher() -> AnyPublisher<String, Error> {
CustomPublisher(subscribe: { subscriber in
log("[PUBLISHER-SUBSCRIBE] subscriber: \(subscriber)")
return CustomSubscription(subscriber, request: { subscriber, demand in
log("[SUBSCRIPTION-REQUEST] subscriber: \(subscriber) demand: \(demand)")
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
let value = "Text A"
log("[SUBSCRIPTION-PRODUCE] value: \(value)")
_ = subscriber.receive(value)
}
DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
let value = "Text B"
log("[SUBSCRIPTION-PRODUCE] value: \(value)")
_ = subscriber.receive(value)
}
DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
let completion = Subscribers.Completion<Error>.finished
log("[SUBSCRIPTION-COMPLETE] completion: \(completion)")
subscriber.receive(completion: .finished)
}
}, cancel: {
log("[SUBSCRIPTION-CANCEL]")
}, deinit: {
log("[SUBSCRIPTION-DEINIT]")
})
}).eraseToAnyPublisher()
}
let subscription = createPublisher()
.subscribe(on: DispatchQueue.global(qos: .background))
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
log("[SINK-RECEIVED] completion: \(completion)")
}, receiveValue: { value in
log("[SINK-RECEIVED] value: \(value)")
})
// Prints:
// -------
// [other thread] [PUBLISHER-SUBSCRIBE] subscriber: SubscribeOn
// [other thread] [SUBSCRIPTION-REQUEST] subscriber: SubscribeOn demand: unlimited
// [other thread] [SUBSCRIPTION-PRODUCE] value: Text A
// [main thread] [SINK-RECEIVED] value: Text A
// [other thread] [SUBSCRIPTION-PRODUCE] value: Text B
// [main thread] [SINK-RECEIVED] value: Text B
// [other thread] [SUBSCRIPTION-COMPLETE] completion: finished
// [other thread] [SUBSCRIPTION-DEINIT]
// [main thread] [SINK-RECEIVED] completion: finished
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment