Last active
August 29, 2022 02:30
-
-
Save marcpalmer/5bf8d375cc516b605ec9468f17d287d8 to your computer and use it in GitHub Desktop.
A Combine operator that works like receive(on:) but only hops on the scheduler after the initial subscribe process completes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
/// Usage example: | |
/// | |
/// ``` | |
/// source | |
/// .receiveAsyncResults(on: DispatchQueue.main) | |
/// .sink { | |
/// print("Value: \($0)") | |
/// } | |
/// .store(in: &cancellables) | |
/// ``` | |
extension Publishers { | |
/// A publisher that publishers any values that are immediately available on the current thread/queue, | |
/// but anything received after the subscription process completes is published on a different scheduler. | |
/// | |
/// This allows you do perform immediate state updates at the point of subscribing without having | |
/// `.receive(on:)` force every single result to require a scheduler hop. | |
public struct ReceiveAsyncResultsOn<Upstream, SchedulerType>: Publisher where Upstream: Publisher, SchedulerType: Scheduler { | |
public typealias Output = Upstream.Output | |
public typealias Failure = Upstream.Failure | |
public let upstream: Upstream | |
public let scheduler: SchedulerType | |
public init(upstream: Upstream, | |
scheduler: SchedulerType) { | |
self.upstream = upstream | |
self.scheduler = scheduler | |
} | |
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input { | |
// Subscribe our proxy to this publisher pipeline | |
let subscription = ProxySubscription(asyncScheduler: scheduler, target: subscriber) | |
upstream.subscribe(subscription) | |
// Tell the subscriber to receive our proxy | |
subscriber.receive(subscription: subscription) | |
} | |
} | |
public class ProxySubscription<Target: Subscriber, SchedulerType>: Subscription, Subscriber where SchedulerType: Scheduler { | |
public typealias Input = Target.Input | |
public typealias Failure = Target.Failure | |
private var upstreamSubscription: Subscription? | |
private var isFirstDemandRequest = true | |
private let asyncScheduler: SchedulerType | |
private let target: Target | |
init(asyncScheduler: SchedulerType, target: Target) { | |
self.asyncScheduler = asyncScheduler | |
self.target = target | |
} | |
// Subscription | |
public func request(_ demand: Subscribers.Demand) { | |
guard let upstreamSubscription = upstreamSubscription else { | |
fatalError("No upstream subscription received yet") | |
} | |
upstreamSubscription.request(demand) // This will start spraying results at us | |
isFirstDemandRequest = false | |
} | |
public func cancel() { | |
upstreamSubscription?.cancel() | |
} | |
// Subscriber | |
public func receive(completion: Subscribers.Completion<Target.Failure>) { | |
target.receive(completion: completion) | |
} | |
public func receive(_ input: Input) -> Subscribers.Demand { | |
if isFirstDemandRequest { | |
return target.receive(input) | |
} else { | |
asyncScheduler.schedule { [weak self] in | |
guard let strongSelf = self else { | |
return | |
} | |
let newDemand = strongSelf.target.receive(input) | |
if newDemand != .none { | |
strongSelf.upstreamSubscription?.request(newDemand) | |
} | |
} | |
return .none | |
} | |
} | |
public func receive(subscription: Subscription) { | |
// We'll be passed the upstream here, and we forward demand requests to it | |
upstreamSubscription = subscription | |
} | |
} | |
} | |
extension Publisher { | |
public func receiveAsyncResults<S>(on scheduler: S) -> Publishers.ReceiveAsyncResultsOn<Self, S> where S: Scheduler { | |
return Publishers.ReceiveAsyncResultsOn(upstream: self, scheduler: scheduler) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment