Skip to content

Instantly share code, notes, and snippets.

@marcpalmer
Created January 28, 2023 15:30
Show Gist options
  • Save marcpalmer/6bb596d3cf7ccc3cce7dd751935ec264 to your computer and use it in GitHub Desktop.
Save marcpalmer/6bb596d3cf7ccc3cce7dd751935ec264 to your computer and use it in GitHub Desktop.
import Foundation
import Combine
extension Publishers {
/// A publisher that publishers any values that are immediately available on the current thread/queue,
/// but anything received after the subscription process completes is published on a different scheduler.
///
/// This allows you do perform immediate state updates at the point of subscribing without having
/// `.receive(on:)` force every single result to require a scheduler hop.
public struct ReceiveAsyncResultsOn<Upstream, SchedulerType>: Publisher where Upstream: Publisher, SchedulerType: Scheduler {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let scheduler: SchedulerType
public init(upstream: Upstream,
scheduler: SchedulerType) {
self.upstream = upstream
self.scheduler = scheduler
}
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input {
// Subscribe our proxy to this publisher pipeline
let subscription = ProxySubscription(asyncScheduler: scheduler, target: subscriber)
upstream.subscribe(subscription)
// Tell the subscriber to receive our proxy
subscriber.receive(subscription: subscription)
}
}
public class ProxySubscription<Target: Subscriber, SchedulerType>: Subscription, Subscriber where SchedulerType: Scheduler {
public typealias Input = Target.Input
public typealias Failure = Target.Failure
private var upstreamSubscription: Subscription?
private var isFirstDemandRequest = true
private let asyncScheduler: SchedulerType
private let target: Target
init(asyncScheduler: SchedulerType, target: Target) {
self.asyncScheduler = asyncScheduler
self.target = target
}
// Subscription
public func request(_ demand: Subscribers.Demand) {
guard let upstreamSubscription = upstreamSubscription else {
fatalError("No upstream subscription received yet")
}
upstreamSubscription.request(demand) // This will start spraying results at us
isFirstDemandRequest = false
}
public func cancel() {
upstreamSubscription?.cancel()
}
// Subscriber
public func receive(completion: Subscribers.Completion<Target.Failure>) {
target.receive(completion: completion)
}
public func receive(_ input: Input) -> Subscribers.Demand {
if isFirstDemandRequest {
return target.receive(input)
} else {
asyncScheduler.schedule { [weak self] in
guard let strongSelf = self else {
return
}
let newDemand = strongSelf.target.receive(input)
if newDemand != .none {
strongSelf.upstreamSubscription?.request(newDemand)
}
}
return .none
}
}
public func receive(subscription: Subscription) {
// We'll be passed the upstream here, and we forward demand requests to it
upstreamSubscription = subscription
}
}
}
extension Publisher {
// Usage: publisher.receiveAsyncResults(on: DispatchQueue.main)
//
// This will deliver any existing results in the publisher immediately, and layer results will be received asynchronously
// on the main queue. So the first delivery does not require an async thread hop - if the publisher can deliver them
// when subscribing you will get them immediately on the current queue, whatever that is.
//
// This is most useful when the publisher emits values from a non-main queue and you need them on a non-main queue,
// but you want immediate results at the point of subscribing. Care must be take to make the subscription
// processing threadsafe as it may be called on the publisher's queue or your desired async queue.
public func receiveAsyncResults<S>(on scheduler: S) -> Publishers.ReceiveAsyncResultsOn<Self, S> where S: Scheduler {
return Publishers.ReceiveAsyncResultsOn(upstream: self, scheduler: scheduler)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment