Skip to content

Instantly share code, notes, and snippets.

@ryotapoi
Last active July 12, 2020 17:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryotapoi/ce3730a0c9219c4ae10a31d57bb69b2f to your computer and use it in GitHub Desktop.
Save ryotapoi/ce3730a0c9219c4ae10a31d57bb69b2f to your computer and use it in GitHub Desktop.
Combine.Subjectを実装してCurrentValueSubjectのクローンを作る
import Combine
class MySubject<Output, Failure : Error> : Combine.Subject {
var value: Output {
didSet {
// 完了していなければ値を通知する
guard completion == nil else { return }
subscriptions.forEach { $0.receive(value) }
}
}
private var completion: Subscribers.Completion<Failure>? {
didSet {
// 完了を2回送らないように注意
guard oldValue == nil, let completion = completion else { return }
subscriptions.forEach { $0.receive(completion: completion) }
}
}
private var subscriptions: [MySubjectSubscription<Output, Failure>] = []
init(_ value: Output) {
self.value = value
}
// Publisher
func receive<S>(
subscriber: S
) where S : Combine.Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = MySubjectSubscription(subscriber: subscriber, cancel: cancel(subscription:))
subscriber.receive(subscription: subscription)
subscriptions.append(subscription)
if let completion = completion {
// 完了していれば即時通知
subscription.receive(completion: completion)
} else {
// CurrentValueSubjectは値を保持しているので、完了していなければ即時通知
subscription.receive(value)
}
}
// Subject
func send(_ value: Output) {
guard completion == nil else { return }
self.value = value
}
func send(completion: Subscribers.Completion<Failure>) {
// 完了しているSubjectをsubscribeした場合、即時完了通知する必要があるため、completionは取っておく。
self.completion = completion
}
func send(subscription: Combine.Subscription) {
// 値が3個でいいなど決めることができないので、無限に要求する
subscription.request(.unlimited)
}
// for Subscription
private func cancel(subscription: MySubjectSubscription<Output, Failure>) {
guard let index = subscriptions.firstIndex(of: subscription) else { return }
subscriptions.remove(at: index)
}
}
class MySubjectSubscription<Output, Failure : Error> : Combine.Subscription, Equatable {
private var demand: Subscribers.Demand = .none
private var subscriber: AnySubscriber<Output, Failure>
private var cancelSubscription: (MySubjectSubscription<Output, Failure>) -> Void
init<S>(
subscriber: S,
cancel: @escaping (MySubjectSubscription<Output, Failure>) -> Void
) where S : Combine.Subscriber, Output == S.Input, Failure == S.Failure {
self.subscriber = .init(subscriber)
self.cancelSubscription = cancel
}
// Subscription
func request(_ demand: Subscribers.Demand) {
// request(_:)が複数回呼ばれることがあるか不明。
// self.demand += demand とどちらがいいかよくわからず。
self.demand = demand
}
// Cancellable
func cancel() {
cancelSubscription(self)
}
// for Subscriber
func receive(_ value: Output) {
guard demand != .none else { return }
demand -= 1 // ひとつ通知するからひとつ減らす
demand += subscriber.receive(value) // 追加で要求された数をdemandに足しておく
}
func receive(completion: Subscribers.Completion<Failure>) {
subscriber.receive(completion: completion)
}
static func == (lhs: MySubjectSubscription<Output, Failure>, rhs: MySubjectSubscription<Output, Failure>) -> Bool {
lhs.combineIdentifier == rhs.combineIdentifier
}
}
do {
let subject: MySubject<Int, Error> = .init(1)
subject.send(completion: .finished)
// completionしているとsinkで即competionが呼ばれる
let cancelable = subject.sink(receiveCompletion: { (completion) in
completion
return
}, receiveValue: { (value) in
// completionの後は呼ばれない
value
return
})
}
do {
let subject: MySubject<Int, Error> = .init(1)
let cancelable = subject.sink(receiveCompletion: { (completion) in
print(completion)
}, receiveValue: { (value) in
// 1回だけ実行される
// completionの後は呼ばれない
value
return
})
subject.send(completion: .finished)
// completionしていてもvalueは取れる
subject.value // => 1
subject.send(2)
// completionしているとsendは無視される
subject.value // => 1
subject.value = 3
// completionしていてもvalueは有効。
// valueはCombine.SubjectではなくCurrentValueSubjectの機能だから?
subject.value // => 3
}
do {
let subject1: MySubject<Int, Error> = .init(1)
let subject2: MySubject<Int, Error> = .init(100)
subject1.value // => 1
subject2.value // => 100
let cancelable = subject1.subscribe(subject2)
// currentValueSubjectはsubscribeした瞬間現在の値が流れるので、subject1,2は同じ値になる
subject1.value // => 1
subject2.value // => 1
subject1.send(2)
// 上流であるsubject1に値を流すとsubject2に値が流れる
subject1.value // => 2
subject2.value // => 2
subject2.send(200)
// 下流であるsubject2に値を流してもsubject1には影響しない
subject1.value // => 2
subject2.value // => 200
}
do {
let subject1: MySubject<Int, Error> = .init(1)
let subject2: MySubject<Int, Error> = .init(100)
let cancelable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1
cancelable.cancel()
subject1.send(2)
// cancelすればsubject1からsubject2に値は流れなくなる
subject1.value // => 2
subject2.value // => 1
}
do {
let subject1: MySubject<Int, Error> = .init(1)
let subject2: MySubject<Int, Error> = .init(100)
let cancelable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1
subject1.send(completion: .finished)
// subject1を完了するとsubject2も完了する
// subject2へのsendが無効になる
subject2.send(200)
subject1.value // => 1
subject2.value // => 1
}
do {
let subject1: MySubject<Int, Error> = .init(1)
let subject2: MySubject<Int, Error> = .init(100)
let cancelable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1
cancelable.cancel()
subject1.send(completion: .finished)
// cancelすればcompletionの流れない
subject2.send(200)
subject1.value // => 1
subject2.value // => 200
}
do {
let subject1: MySubject<Int, Error> = .init(1)
let subject2: MySubject<Int, Error> = .init(100)
let cancellable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1
subject2.send(completion: .finished)
subject1.send(2)
// subject2が完了しているので、subject1からsubject2に値を流す設定でもsubject2には影響しない
subject1.value // => 2
subject2.value // => 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment