Last active
November 6, 2019 04:10
-
-
Save RoshanNindrai/98218407d232addc4b44dc67dbef171c to your computer and use it in GitHub Desktop.
Sample Combine Operator (Purely for learning purposes & This is not thread safe)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
public extension Publishers { | |
final class Pulse<Upstream: Publisher>: Publisher { | |
public typealias Output = Upstream.Output | |
public typealias Failure = Upstream.Failure | |
private let interval: Int | |
private let upstream: Upstream | |
public init(_ interval: Int, upstream: Upstream) { | |
self.interval = interval | |
self.upstream = upstream | |
} | |
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = Inner(upstream: upstream, downstream: subscriber, interval: interval) | |
subscriber.receive(subscription: subscription) | |
} | |
} | |
} | |
extension Publishers.Pulse { | |
public class Inner<UpStream: Publisher, DownStream: Subscriber>: Subscription, Subscriber where | |
DownStream.Input == UpStream.Output, DownStream.Failure == UpStream.Failure { | |
public typealias Input = [UpStream.Output] | |
public typealias Failure = UpStream.Failure | |
private var upstream: UpStream? | |
private var downstream: DownStream? | |
private var didConnect: Bool = false | |
private let interval: Int | |
init(upstream: UpStream, downstream: DownStream, interval: Int) { | |
self.upstream = upstream | |
self.downstream = downstream | |
self.interval = interval | |
} | |
public func request(_ demand: Subscribers.Demand) { | |
guard !didConnect, demand == .unlimited else { return } | |
upstream?.collect(interval).subscribe(self) | |
didConnect = true | |
} | |
public func cancel() { | |
downstream = nil | |
upstream = nil | |
} | |
public func receive(subscription: Subscription) { | |
subscription.request(.unlimited) | |
} | |
public func receive(_ input: [UpStream.Output]) -> Subscribers.Demand { | |
guard input.count == interval, let output = input.last else { | |
return .none | |
} | |
downstream?.receive(output) | |
return .unlimited | |
} | |
public func receive(completion: Subscribers.Completion<UpStream.Failure>) { | |
downstream?.receive(completion: completion) | |
} | |
} | |
} | |
extension Publisher { | |
public func pulse(_ interval: Int) -> AnyPublisher<Output, Failure> { | |
return Publishers.Pulse<Self>(interval, upstream: self).eraseToAnyPublisher() | |
} | |
} | |
let collectionPublisher = (1...10).publisher | |
let cancellable = collectionPublisher.pulse(2).sink { print($0) } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment