Skip to content

Instantly share code, notes, and snippets.

@RoshanNindrai
Last active November 6, 2019 04:10
Show Gist options
  • Save RoshanNindrai/98218407d232addc4b44dc67dbef171c to your computer and use it in GitHub Desktop.
Save RoshanNindrai/98218407d232addc4b44dc67dbef171c to your computer and use it in GitHub Desktop.
Sample Combine Operator (Purely for learning purposes & This is not thread safe)
import Combine
public extension Publishers {
final class Pulse<Upstream: Publisher>: Publisher {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
private let interval: Int
private let upstream: Upstream
public init(_ interval: Int, upstream: Upstream) {
self.interval = interval
self.upstream = upstream
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = Inner(upstream: upstream, downstream: subscriber, interval: interval)
subscriber.receive(subscription: subscription)
}
}
}
extension Publishers.Pulse {
public class Inner<UpStream: Publisher, DownStream: Subscriber>: Subscription, Subscriber where
DownStream.Input == UpStream.Output, DownStream.Failure == UpStream.Failure {
public typealias Input = [UpStream.Output]
public typealias Failure = UpStream.Failure
private var upstream: UpStream?
private var downstream: DownStream?
private var didConnect: Bool = false
private let interval: Int
init(upstream: UpStream, downstream: DownStream, interval: Int) {
self.upstream = upstream
self.downstream = downstream
self.interval = interval
}
public func request(_ demand: Subscribers.Demand) {
guard !didConnect, demand == .unlimited else { return }
upstream?.collect(interval).subscribe(self)
didConnect = true
}
public func cancel() {
downstream = nil
upstream = nil
}
public func receive(subscription: Subscription) {
subscription.request(.unlimited)
}
public func receive(_ input: [UpStream.Output]) -> Subscribers.Demand {
guard input.count == interval, let output = input.last else {
return .none
}
downstream?.receive(output)
return .unlimited
}
public func receive(completion: Subscribers.Completion<UpStream.Failure>) {
downstream?.receive(completion: completion)
}
}
}
extension Publisher {
public func pulse(_ interval: Int) -> AnyPublisher<Output, Failure> {
return Publishers.Pulse<Self>(interval, upstream: self).eraseToAnyPublisher()
}
}
let collectionPublisher = (1...10).publisher
let cancellable = collectionPublisher.pulse(2).sink { print($0) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment