Skip to content

Instantly share code, notes, and snippets.

@SeRG1k17
Created September 29, 2020 11:23
Show Gist options
  • Save SeRG1k17/a6492bc8d2203bd2bf4f96c9cfa4a578 to your computer and use it in GitHub Desktop.
Save SeRG1k17/a6492bc8d2203bd2bf4f96c9cfa4a578 to your computer and use it in GitHub Desktop.
Swift combine StartWith
extension Publisher {
func unwrap() -> Publishers.CompactMap<Self, Self.Output> {
return compactMap({ $0 })
}
func startWith1(_ value: Self.Output) -> Publishers.Merge<AnyPublisher<Self.Output, Self.Failure>, Self> {
return Publishers.Merge(Just(value).setFailureType(to: Self.Failure.self).eraseToAnyPublisher(), self)
}
func startWith(_ value: Self.Output) -> Publishers.StartWith<Self> {
return Publishers.StartWith(upstream: self, output: value)
}
}
extension Publishers {
struct StartWith<Upstream>: Publisher where Upstream : Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
let output: Output
/// The publisher from which this publisher receives elements.
let upstream: Upstream
public init(upstream: Upstream, output: Self.Output) {
self.upstream = upstream
self.output = output
}
func receive<S>(subscriber: S) where S : Combine.Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
let subs = Subscription(upstream: upstream, downstream: subscriber, output: output)
subscriber.receive(subscription: subs)
}
}
}
extension Publishers.StartWith {
class Subscription<Downstream: Combine.Subscriber>: Combine.Subscription where Downstream.Input == Output, Downstream.Failure == Failure {
private var sink: Subscriber<Upstream, Downstream>?
private let output: Upstream.Output
init(upstream: Upstream, downstream: Downstream, output: Upstream.Output) {
self.sink = Subscriber(upstream: upstream, downstream: downstream)
self.output = output
}
func request(_ demand: Subscribers.Demand) {
_ = sink?.receive(output)
}
func cancel() {
sink = nil
}
}
}
extension Publishers.StartWith {
class Subscriber<Upstream, Downstream>: Combine.Subscriber where
Upstream : Publisher,
Downstream : Combine.Subscriber,
Upstream.Output == Downstream.Input,
Upstream.Failure == Downstream.Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private var downstream: Downstream
private var demand = Subscribers.Demand.unlimited
init(upstream: Upstream, downstream: Downstream) {
self.downstream = downstream
upstream.subscribe(self)
}
func receive(subscription: Combine.Subscription) {
subscription.request(.unlimited)
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
return downstream.receive(input)
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
}
}
final class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(2))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
let publisher = PassthroughSubject<Int, Never>()
let subscriber = IntSubscriber()
publisher
.startWith(10)
.subscribe(subscriber)
publisher.send(7)
publisher.send(15)
publisher.send(20)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment