Skip to content

Instantly share code, notes, and snippets.

@jasdev
Last active April 20, 2020 15:51
Show Gist options
  • Save jasdev/e79c3abf50e042c8504aee512fec9f21 to your computer and use it in GitHub Desktop.
Save jasdev/e79c3abf50e042c8504aee512fec9f21 to your computer and use it in GitHub Desktop.
`ReplaySubject` with `.receive(subscriber:)`.
final class ReplaySubject<Output, Failure: Error>: Subject {
typealias Output = Output
typealias Failure = Failure
private let bufferSize: Int
private var completion: Subscribers.Completion<Failure>?
private var isActive: Bool {
completion == nil
}
private var buffer = [Output]()
private var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]() /// (3)
init(bufferSize: Int) { /* … */ }
func send(subscription: Combine.Subscription) { /* … */ }
func send(completion: Subscribers.Completion<Failure>) { /* … */ }
func send(_ value: Output) { /* … */ }
func receive<Subscriber: Combine.Subscriber>(
subscriber: Subscriber
) where Failure == Subscriber.Failure, Output == Subscriber.Input {
guard !subscriptions.contains( /// (1) There are more robust ways to handle accidental double subscriptions
/// with the _same_ subscriber (and [the backing CombineExt PR](https://github.com/CombineCommunity/CombineExt/pull/23) takes those steps),
/// but, for now if we receive a subscriber with an already-known [`CombineIdentifer`](https://developer.apple.com/documentation/combine/combineidentifier),
/// we send it an empty subscription.
where: { $0.innerSubscriberIdentifier == subscriber.combineIdentifier }
) else {
subscriber.receive(subscription: Subscriptions.empty)
return
}
let subscription = Subscription(subscriber: AnySubscriber(subscriber)) /// (2) To create a
/// `ReplaySubject.Subscription` instance, we’ll need a bit more than `class`’ default empty initializer.
/// So, let’s hold onto a type-erased subscriber—so we can concretely store it over at `(3)`—and send the
/// subscription along to it with any buffered values and completion event.
subscriptions.append(subscription)
subscriber.receive(subscription: subscription)
buffer.forEach(subscription.forward)
if let completion = completion { subscription.forward(completion: completion) }
}
}
extension ReplaySubject {
final class Subscription<Subscriber: Combine.Subscriber>: Combine.Subscription
where Subscriber.Input == Output, Subscriber.Failure == Failure { /// (4) We’ll bump into needing
/// this constraint, later, but for now, we want to make sure attached subscribers’ input matches
/// `Output` and similarly for `Failure`.
init(subscriber: Subscriber) { /* … */ }
func request(_ demand: Subscribers.Demand) { /* … */ }
func cancel() { /* … */ }
func forward(completion: Subscribers.Completion<Failure>) { /* … */ }
func forward(value: Output) { /* … */ }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment