Last active
September 16, 2019 11:19
-
-
Save groue/dc2c8dc7847dd9019d0df5bfd1f76967 to your computer and use it in GitHub Desktop.
DON'T USE - DOES NOT WORK - Latest WIP at https://github.com/groue/GRDBCombine/blob/a002d9ef4723f80a3c986e5373a643e3bf51aed8/Sources/GRDBCombine/ReceiveValuesOn.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
/// A publisher that delivers values to its downstream subscriber on a | |
/// specific scheduler. | |
/// | |
/// Unlike Combine's Publishers.ReceiveOn, ReceiveValuesOn only re-schedule | |
/// values and completion. It does not re-schedule subscription. | |
struct ReceiveValuesOn<Upstream: Publisher, Context: Scheduler>: Publisher { | |
typealias Output = Upstream.Output | |
typealias Failure = Upstream.Failure | |
fileprivate let upstream: Upstream | |
fileprivate let context: Context | |
fileprivate let options: Context.SchedulerOptions? | |
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { | |
upstream.receive(subscriber: ReceiveValuesOnSubscriber( | |
downstream: subscriber, | |
context: context, | |
options: options)) | |
} | |
} | |
private struct ReceiveValuesOnSubscriber<Downstream: Subscriber, Context: Scheduler>: Subscriber { | |
let combineIdentifier = CombineIdentifier() | |
fileprivate let downstream: Downstream | |
fileprivate let context: Context | |
fileprivate let options: Context.SchedulerOptions? | |
func receive(subscription: Subscription) { | |
downstream.receive(subscription: subscription) | |
} | |
func receive(_ input: Downstream.Input) -> Subscribers.Demand { | |
context.schedule(options: options) { | |
_ = self.downstream.receive(input) | |
} | |
// TODO: what problem are we creating by returning .unlimited and | |
// ignoring downstream's result? | |
// | |
// `Publisher.receive(on:options:)` does not document its behavior | |
// regarding backpressure. | |
return .unlimited | |
} | |
func receive(completion: Subscribers.Completion<Downstream.Failure>) { | |
context.schedule(options: options) { | |
self.downstream.receive(completion: completion) | |
} | |
} | |
} | |
extension Publisher { | |
/// Specifies the scheduler on which to receive values from the publisher | |
/// | |
/// The difference with the stock `receive(on:options:)` Combine method is | |
/// that only values and completion are re-scheduled. | |
/// Subscriptions are not. | |
func receiveValues<S: Scheduler>(on scheduler: S, options: S.SchedulerOptions? = nil) -> ReceiveValuesOn<Self, S> { | |
return ReceiveValuesOn(upstream: self, context: scheduler, options: options) | |
} | |
} | |
// ----------------------------------------------------------------------------- | |
// MARK: - Test | |
import Foundation | |
let control = PassthroughSubject<Int,Never>() | |
let pipe = control | |
.receiveValues(on: RunLoop.main) | |
.sink(receiveValue: { print($0) }) | |
control.send(233) | |
RunLoop.main.run() | |
// prints "233" 🥳 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment