Skip to content

Instantly share code, notes, and snippets.

@natpenguin
Created September 4, 2020 09:13
Show Gist options
  • Save natpenguin/f52ceeeacb329059fe60e61aa337dee9 to your computer and use it in GitHub Desktop.
Save natpenguin/f52ceeeacb329059fe60e61aa337dee9 to your computer and use it in GitHub Desktop.
An combine operator that pass the stream event with previous stream event
extension Publishers {
struct Previous<Upstream: Publisher>: Publisher {
typealias Output = (Upstream.Output?, Upstream.Output)
typealias Failure = Upstream.Failure
private let upstream: Upstream
init(upstream: Upstream) {
self.upstream = upstream
}
func receive<S>(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
}
}
}
extension Publishers.Previous {
class Subscription<Downstream: Subscriber>: Combine.Subscription where (Upstream.Output?, Upstream.Output) == Downstream.Input, Downstream.Failure == Failure {
private var sink: Sink<Upstream, Downstream>?
init(upstream: Upstream, downstream: Downstream) {
sink = .init(upstream: upstream, downstream: downstream)
}
func request(_: Subscribers.Demand) {}
func cancel() {
sink = nil
}
}
class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where (Upstream.Output?, Upstream.Output) == Downstream.Input, Upstream.Failure == Downstream.Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private var downstream: Downstream
private var previousInput: Input?
init(upstream: Upstream, downstream: Downstream) {
self.downstream = downstream
upstream.subscribe(self)
}
func receive(_ input: Input) -> Subscribers.Demand {
let demand = downstream.receive((previousInput, input))
previousInput = input
return demand
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
func receive(subscription: Combine.Subscription) {
subscription.request(.unlimited)
}
}
}
extension Publisher {
func previous() -> Publishers.Previous<Self> {
Publishers.Previous(upstream: self)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment