Last active
March 7, 2020 03:36
-
-
Save JadenGeller/cb14b3169c1b4ff3cef08f692a5d9f14 to your computer and use it in GitHub Desktop.
Pair Publisher in Combine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
extension Publisher { | |
func pair() -> Publishers.Pair<Self> { | |
Publishers.Pair(upstream: self) | |
} | |
} | |
extension Publishers { | |
class Pair<Upstream: Publisher>: Publisher { | |
typealias Failure = Upstream.Failure | |
let upstream: Upstream | |
init(upstream: Upstream) { | |
self.upstream = upstream | |
} | |
enum Output { | |
case first(Upstream.Output) | |
case pair(Upstream.Output, Upstream.Output) | |
case final(Upstream.Output) | |
} | |
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { | |
upstream.subscribe(Inner(downstream: subscriber)) | |
} | |
} | |
} | |
extension Publishers.Pair { | |
private final class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Failure { | |
let combineIdentifier = CombineIdentifier() | |
private let downstream: Downstream | |
init(downstream: Downstream) { | |
self.downstream = downstream | |
} | |
func receive(subscription: Subscription) { | |
downstream.receive(subscription: subscription) | |
} | |
var previousInput: Upstream.Output? | |
func receive(_ input: Upstream.Output) -> Subscribers.Demand { | |
defer { previousInput = input } | |
if let previousInput = previousInput { | |
return downstream.receive(.pair(previousInput, input)) | |
} else { | |
return downstream.receive(.first(input)) | |
} | |
} | |
func receive(completion: Subscribers.Completion<Upstream.Failure>) { | |
if let previousInput = previousInput { | |
// FIXME: This might overwhelm the downstream subscriber. We probably can't do this. | |
downstream.receive(.final(previousInput)) | |
} | |
downstream.receive(completion: completion) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extension Publishers { | |
typealias Pair<Upstream> = Publishers.Map<Publishers.Scan<Upstream, (Upstream.Output?, Upstream.Output?)>, (Upstream.Output?, Upstream.Output)> where Upstream: Publisher | |
} | |
extension Publisher { | |
func pair() -> Publishers.Pair<Self> { | |
scan((nil as Output?, nil as Output?), { previousPair, value in | |
(previousPair.1, value) | |
}).map({ pair in | |
(pair.0, pair.1!) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment