Skip to content

Instantly share code, notes, and snippets.

@dhoerl
Last active September 7, 2021 20:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dhoerl/57fe946f95b647184c38b5c3942fc32c to your computer and use it in GitHub Desktop.
Save dhoerl/57fe946f95b647184c38b5c3942fc32c to your computer and use it in GitHub Desktop.
Combine Sample Publisher for Medium Article
enum SPErrors: Error {
case inputStringWasEmpty
}
struct StringPublisher: Publisher {
typealias Output = [Character]
typealias Failure = Error
private let data: [Character]
init(string: String) {
self.data = string.map({$0})
}
func receive<S>(subscriber: S) where
S: Subscriber,
S.Failure == Self.Failure,
S.Input == Self.Output
{
let subscription = StringPublisherSubscription(subscriber: subscriber, data: data)
subscriber.receive(subscription: subscription)
}
final class StringPublisherSubscription<S>: Subscription where
S: Subscriber,
S.Input == [Character],
S.Failure == Error
{
private var subscriber: S?
private var data: [Character]
private var runningDemand: Subscribers.Demand = .max(0)
private var isFinished = false
private var isProcessingRequest = false // make this Atomic to be threadsafe
init(subscriber: S, data: [Character]) {
self.subscriber = subscriber
self.data = data
}
func request(_ demand: Subscribers.Demand) {
guard !isFinished else { return }
guard let subscriber = subscriber else { return }
guard data.count > 0 else { return sendError(.inputStringWasEmpty) }
runningDemand += demand
if isProcessingRequest == true {
return
} else {
isProcessingRequest = true
}
while runningDemand > 0 && !data.isEmpty {
let count = computeSendCount()
let tempData: [Character] = Array( data.prefix(upTo: count) )
let stillDesired = subscriber.receive(tempData)
// Only update counts and data AFTER sending receive
data.removeSubrange(0..<count)
runningDemand -= count
if let runningDesired = runningDemand.max, let stillDesired = stillDesired.max {
assert(runningDesired == stillDesired)
}
}
if data.isEmpty {
subscriber.receive(completion: .finished)
isFinished = true
}
isProcessingRequest = false
}
private func sendError(_ error: SPErrors) {
subscriber?.receive(completion: .failure(error))
}
private func computeSendCount() -> Int {
let count: Int
if let demand = runningDemand.max {
count = Swift.min(data.count, demand)
} else {
count = data.count
}
return count
}
func cancel() {
isFinished = true
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment