Skip to content

Instantly share code, notes, and snippets.

@dhoerl
Last active March 31, 2020 19:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dhoerl/3acfadfa3de1fc97f2bb5c737ebb3d75 to your computer and use it in GitHub Desktop.
Save dhoerl/3acfadfa3de1fc97f2bb5c737ebb3d75 to your computer and use it in GitHub Desktop.
Combine Complex Publisher for Medium
struct UpperCasePublisher: Publisher {
typealias Output = [Character]
typealias Failure = Error
let upstreamPublisher: AnyPublisher<Output, Error>
init(upstream: AnyPublisher<Output, Error>) {
self.upstreamPublisher = upstream
}
func receive<S>(subscriber: S) where
S: Subscriber,
S.Failure == Self.Failure,
S.Input == Self.Output
{
let subscription = UpperCaseSubscription(subscriber: subscriber, upstream: upstreamPublisher)
upstreamPublisher.subscribe(subscription)
}
final class UpperCaseSubscription<S, P: Publisher>: Subscription, Subscriber where
S: Subscriber,
S.Input == Output,
S.Failure == Error
{
typealias Input = P.Output // for Subscriber
typealias Failure = P.Failure // for Subscriber
private var data: [Character] = []
private var isProcessingRequest = false // make this Atomic to be threadsafe
//private var isOperational: Bool { !isUpstreamFinished && !isDownstreamCancelled }
// Upstream Related
private var upstreamSubscription: Subscription? // AnySubscriber<Output, Error>?
private var isUpstreamFinished = false
// Downstream Related
private var downstreamSubscriber: S?
private var isDownstreamCancelled = false
private var runningDemand: Subscribers.Demand = .max(0)
init(subscriber: S, upstream: P) {
self.downstreamSubscriber = subscriber
}
// MARK: - Downstream Subscriber
func request(_ demand: Subscribers.Demand) {
guard !isDownstreamCancelled else { return }
guard let downstreamSubscriber = downstreamSubscriber else { return }
runningDemand += demand
if isProcessingRequest == true {
return
} else {
isProcessingRequest = true
}
while runningDemand > 0 && !data.isEmpty {
let count = computeSendCount()
let tempData: [Character] = Array( data.prefix(upTo: count) )
let stillDesired = downstreamSubscriber.receive(tempData)
// Only update counts and data AFTER sending receive
data.removeSubrange(0..<count)
runningDemand -= count
if let runningDesired = runningDemand.max, let stillDesired = stillDesired.max {
assert(runningDesired == stillDesired)
}
}
if isUpstreamFinished && data.isEmpty {
downstreamSubscriber.receive(completion: .finished)
}
isProcessingRequest = false
}
func cancel() {
isDownstreamCancelled = true
upstreamSubscription?.cancel()
self.upstreamSubscription = nil
}
private func computeSendCount() -> Int {
let count: Int
if let demand = runningDemand.max {
count = Swift.min(data.count, demand)
} else {
count = data.count
}
return count
}
// MARK: - Upstream Subscription
func receive(subscription: Subscription) {
downstreamSubscriber?.receive(subscription: self)
upstreamSubscription = subscription
upstreamSubscription?.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
guard let input = input as? [Character] else { fatalError() }
input.forEach({
let s = $0.uppercased()
s.forEach({ data.append($0) })
})
upstreamSubscription?.request(.max(1))
return .max(1)
}
func receive(completion: Subscribers.Completion<P.Failure>) {
isUpstreamFinished = true
switch completion {
case .finished:
request(.max(0)) // Trigger the publisher side to finished sending then send .finished
case .failure(let error):
downstreamSubscriber?.receive(completion: .failure(error))
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment