Skip to content

Instantly share code, notes, and snippets.

@uliwitness
Last active August 21, 2019 11:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save uliwitness/394b308b4aa457293f679560edcdbd5e to your computer and use it in GitHub Desktop.
Save uliwitness/394b308b4aa457293f679560edcdbd5e to your computer and use it in GitHub Desktop.
class Observer: Subscription {
public func unsubscribe() {
...
}
public func request(_ demand: Subscribers.Demand) {
// We only inform on changes.
print("Demand for \(demand) received.")
}
public func cancel() {
print("Unsubscribed.")
unsubscribe()
}
}
public class MyPublisher: Publisher {
public typealias Output = [MyThing]
public typealias Failure = Never
weak var thingManager: ThingManager?
var subscribers = [Int: Observer]()
var subscriberIdSeed: Int = 0
init(thingManager: ThingManager) {
self.thingManager = thingManager
}
public func receive<S>(subscriber: S) where S : Subscriber, MyPublisher.Failure == S.Failure, MyPublisher.Output == S.Input {
guard let thingManager = thingManager else { fatalError("Owning manager went out of scope. Should never happen.") }
subscriberIdSeed += 1
let subscriberId = subscriberIdSeed
let observer = thingManager.subscribe(dispatchQueue: queue, resultHandler: { things in
Swift.print("notified.")
let demand = subscriber.receive(things)
switch demand {
case .unlimited:
Swift.print("\tunlimited.")
break
case .none:
Swift.print("\tnone -> unsubscribed.")
self.subscribers[subscriberId] = nil
subscriber.receive(completion: .finished)
default:
break
}
})
subscribers[subscriberId] = observer
subscriber.receive(subscription: observer)
}
}
class TestSubscriber: Subscriber {
typealias Input = [MyThing]
typealias Failure = Never
internal var things = [MyThing]()
internal var completed = false
func receive(subscription: Subscription) {
print("Subscription started.")
}
func receive(_ input: Input) -> Subscribers.Demand {
print("Data Received \(input).")
things = input
return .unlimited
}
func receive(completion: Subscribers.Completion<Failure>) {
print("Subscription completed.")
completed = true
}
}
@uliwitness
Copy link
Author

Anyone know how to correctly implement async Publisher + Subscriber in Combine (so pushes instead of pulls)? I have a ThingManager I can subscribe to and it’ll call a closure I provide. Is this the right way?

In particular, am I handling the result from Subscriber.receive() correctly?

@lbdl
Copy link

lbdl commented Aug 21, 2019

Yes using a closure you provide is the way that I implement this kind of pattern otherwise you need to keep a reference to the caller

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment