Skip to content

Instantly share code, notes, and snippets.

@JadenGeller
Last active March 7, 2020 03:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JadenGeller/cb14b3169c1b4ff3cef08f692a5d9f14 to your computer and use it in GitHub Desktop.
Save JadenGeller/cb14b3169c1b4ff3cef08f692a5d9f14 to your computer and use it in GitHub Desktop.
Pair Publisher in Combine
import Combine
extension Publisher {
func pair() -> Publishers.Pair<Self> {
Publishers.Pair(upstream: self)
}
}
extension Publishers {
class Pair<Upstream: Publisher>: Publisher {
typealias Failure = Upstream.Failure
let upstream: Upstream
init(upstream: Upstream) {
self.upstream = upstream
}
enum Output {
case first(Upstream.Output)
case pair(Upstream.Output, Upstream.Output)
case final(Upstream.Output)
}
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
upstream.subscribe(Inner(downstream: subscriber))
}
}
}
extension Publishers.Pair {
private final class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Failure {
let combineIdentifier = CombineIdentifier()
private let downstream: Downstream
init(downstream: Downstream) {
self.downstream = downstream
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
var previousInput: Upstream.Output?
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
defer { previousInput = input }
if let previousInput = previousInput {
return downstream.receive(.pair(previousInput, input))
} else {
return downstream.receive(.first(input))
}
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
if let previousInput = previousInput {
// FIXME: This might overwhelm the downstream subscriber. We probably can't do this.
downstream.receive(.final(previousInput))
}
downstream.receive(completion: completion)
}
}
}
extension Publishers {
typealias Pair<Upstream> = Publishers.Map<Publishers.Scan<Upstream, (Upstream.Output?, Upstream.Output?)>, (Upstream.Output?, Upstream.Output)> where Upstream: Publisher
}
extension Publisher {
func pair() -> Publishers.Pair<Self> {
scan((nil as Output?, nil as Output?), { previousPair, value in
(previousPair.1, value)
}).map({ pair in
(pair.0, pair.1!)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment