Skip to content

Instantly share code, notes, and snippets.

@groue
Last active September 16, 2019 11:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save groue/dc2c8dc7847dd9019d0df5bfd1f76967 to your computer and use it in GitHub Desktop.
Save groue/dc2c8dc7847dd9019d0df5bfd1f76967 to your computer and use it in GitHub Desktop.
import Combine
/// A publisher that delivers values to its downstream subscriber on a
/// specific scheduler.
///
/// Unlike Combine's Publishers.ReceiveOn, ReceiveValuesOn only re-schedule
/// values and completion. It does not re-schedule subscription.
struct ReceiveValuesOn<Upstream: Publisher, Context: Scheduler>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
fileprivate let upstream: Upstream
fileprivate let context: Context
fileprivate let options: Context.SchedulerOptions?
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
upstream.receive(subscriber: ReceiveValuesOnSubscriber(
downstream: subscriber,
context: context,
options: options))
}
}
private struct ReceiveValuesOnSubscriber<Downstream: Subscriber, Context: Scheduler>: Subscriber {
let combineIdentifier = CombineIdentifier()
fileprivate let downstream: Downstream
fileprivate let context: Context
fileprivate let options: Context.SchedulerOptions?
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Downstream.Input) -> Subscribers.Demand {
context.schedule(options: options) {
_ = self.downstream.receive(input)
}
// TODO: what problem are we creating by returning .unlimited and
// ignoring downstream's result?
//
// `Publisher.receive(on:options:)` does not document its behavior
// regarding backpressure.
return .unlimited
}
func receive(completion: Subscribers.Completion<Downstream.Failure>) {
context.schedule(options: options) {
self.downstream.receive(completion: completion)
}
}
}
extension Publisher {
/// Specifies the scheduler on which to receive values from the publisher
///
/// The difference with the stock `receive(on:options:)` Combine method is
/// that only values and completion are re-scheduled.
/// Subscriptions are not.
func receiveValues<S: Scheduler>(on scheduler: S, options: S.SchedulerOptions? = nil) -> ReceiveValuesOn<Self, S> {
return ReceiveValuesOn(upstream: self, context: scheduler, options: options)
}
}
// -----------------------------------------------------------------------------
// MARK: - Test
import Foundation
let control = PassthroughSubject<Int,Never>()
let pipe = control
.receiveValues(on: RunLoop.main)
.sink(receiveValue: { print($0) })
control.send(233)
RunLoop.main.run()
// prints "233" 🥳
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment