Skip to content

Instantly share code, notes, and snippets.

@marcpalmer
Last active August 29, 2022 02:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcpalmer/5bf8d375cc516b605ec9468f17d287d8 to your computer and use it in GitHub Desktop.
Save marcpalmer/5bf8d375cc516b605ec9468f17d287d8 to your computer and use it in GitHub Desktop.
A Combine operator that works like receive(on:) but only hops on the scheduler after the initial subscribe process completes
import Foundation
import Combine
/// Usage example:
///
/// ```
/// source
/// .receiveAsyncResults(on: DispatchQueue.main)
/// .sink {
/// print("Value: \($0)")
/// }
/// .store(in: &cancellables)
/// ```
extension Publishers {
/// A publisher that publishers any values that are immediately available on the current thread/queue,
/// but anything received after the subscription process completes is published on a different scheduler.
///
/// This allows you do perform immediate state updates at the point of subscribing without having
/// `.receive(on:)` force every single result to require a scheduler hop.
public struct ReceiveAsyncResultsOn<Upstream, SchedulerType>: Publisher where Upstream: Publisher, SchedulerType: Scheduler {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let scheduler: SchedulerType
public init(upstream: Upstream,
scheduler: SchedulerType) {
self.upstream = upstream
self.scheduler = scheduler
}
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input {
// Subscribe our proxy to this publisher pipeline
let subscription = ProxySubscription(asyncScheduler: scheduler, target: subscriber)
upstream.subscribe(subscription)
// Tell the subscriber to receive our proxy
subscriber.receive(subscription: subscription)
}
}
public class ProxySubscription<Target: Subscriber, SchedulerType>: Subscription, Subscriber where SchedulerType: Scheduler {
public typealias Input = Target.Input
public typealias Failure = Target.Failure
private var upstreamSubscription: Subscription?
private var isFirstDemandRequest = true
private let asyncScheduler: SchedulerType
private let target: Target
init(asyncScheduler: SchedulerType, target: Target) {
self.asyncScheduler = asyncScheduler
self.target = target
}
// Subscription
public func request(_ demand: Subscribers.Demand) {
guard let upstreamSubscription = upstreamSubscription else {
fatalError("No upstream subscription received yet")
}
upstreamSubscription.request(demand) // This will start spraying results at us
isFirstDemandRequest = false
}
public func cancel() {
upstreamSubscription?.cancel()
}
// Subscriber
public func receive(completion: Subscribers.Completion<Target.Failure>) {
target.receive(completion: completion)
}
public func receive(_ input: Input) -> Subscribers.Demand {
if isFirstDemandRequest {
return target.receive(input)
} else {
asyncScheduler.schedule { [weak self] in
guard let strongSelf = self else {
return
}
let newDemand = strongSelf.target.receive(input)
if newDemand != .none {
strongSelf.upstreamSubscription?.request(newDemand)
}
}
return .none
}
}
public func receive(subscription: Subscription) {
// We'll be passed the upstream here, and we forward demand requests to it
upstreamSubscription = subscription
}
}
}
extension Publisher {
public func receiveAsyncResults<S>(on scheduler: S) -> Publishers.ReceiveAsyncResultsOn<Self, S> where S: Scheduler {
return Publishers.ReceiveAsyncResultsOn(upstream: self, scheduler: scheduler)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment