Last active
July 12, 2020 17:07
-
-
Save ryotapoi/ce3730a0c9219c4ae10a31d57bb69b2f to your computer and use it in GitHub Desktop.
Combine.Subjectを実装してCurrentValueSubjectのクローンを作る
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
class MySubject<Output, Failure : Error> : Combine.Subject { | |
var value: Output { | |
didSet { | |
// 完了していなければ値を通知する | |
guard completion == nil else { return } | |
subscriptions.forEach { $0.receive(value) } | |
} | |
} | |
private var completion: Subscribers.Completion<Failure>? { | |
didSet { | |
// 完了を2回送らないように注意 | |
guard oldValue == nil, let completion = completion else { return } | |
subscriptions.forEach { $0.receive(completion: completion) } | |
} | |
} | |
private var subscriptions: [MySubjectSubscription<Output, Failure>] = [] | |
init(_ value: Output) { | |
self.value = value | |
} | |
// Publisher | |
func receive<S>( | |
subscriber: S | |
) where S : Combine.Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = MySubjectSubscription(subscriber: subscriber, cancel: cancel(subscription:)) | |
subscriber.receive(subscription: subscription) | |
subscriptions.append(subscription) | |
if let completion = completion { | |
// 完了していれば即時通知 | |
subscription.receive(completion: completion) | |
} else { | |
// CurrentValueSubjectは値を保持しているので、完了していなければ即時通知 | |
subscription.receive(value) | |
} | |
} | |
// Subject | |
func send(_ value: Output) { | |
guard completion == nil else { return } | |
self.value = value | |
} | |
func send(completion: Subscribers.Completion<Failure>) { | |
// 完了しているSubjectをsubscribeした場合、即時完了通知する必要があるため、completionは取っておく。 | |
self.completion = completion | |
} | |
func send(subscription: Combine.Subscription) { | |
// 値が3個でいいなど決めることができないので、無限に要求する | |
subscription.request(.unlimited) | |
} | |
// for Subscription | |
private func cancel(subscription: MySubjectSubscription<Output, Failure>) { | |
guard let index = subscriptions.firstIndex(of: subscription) else { return } | |
subscriptions.remove(at: index) | |
} | |
} | |
class MySubjectSubscription<Output, Failure : Error> : Combine.Subscription, Equatable { | |
private var demand: Subscribers.Demand = .none | |
private var subscriber: AnySubscriber<Output, Failure> | |
private var cancelSubscription: (MySubjectSubscription<Output, Failure>) -> Void | |
init<S>( | |
subscriber: S, | |
cancel: @escaping (MySubjectSubscription<Output, Failure>) -> Void | |
) where S : Combine.Subscriber, Output == S.Input, Failure == S.Failure { | |
self.subscriber = .init(subscriber) | |
self.cancelSubscription = cancel | |
} | |
// Subscription | |
func request(_ demand: Subscribers.Demand) { | |
// request(_:)が複数回呼ばれることがあるか不明。 | |
// self.demand += demand とどちらがいいかよくわからず。 | |
self.demand = demand | |
} | |
// Cancellable | |
func cancel() { | |
cancelSubscription(self) | |
} | |
// for Subscriber | |
func receive(_ value: Output) { | |
guard demand != .none else { return } | |
demand -= 1 // ひとつ通知するからひとつ減らす | |
demand += subscriber.receive(value) // 追加で要求された数をdemandに足しておく | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
subscriber.receive(completion: completion) | |
} | |
static func == (lhs: MySubjectSubscription<Output, Failure>, rhs: MySubjectSubscription<Output, Failure>) -> Bool { | |
lhs.combineIdentifier == rhs.combineIdentifier | |
} | |
} | |
do { | |
let subject: MySubject<Int, Error> = .init(1) | |
subject.send(completion: .finished) | |
// completionしているとsinkで即competionが呼ばれる | |
let cancelable = subject.sink(receiveCompletion: { (completion) in | |
completion | |
return | |
}, receiveValue: { (value) in | |
// completionの後は呼ばれない | |
value | |
return | |
}) | |
} | |
do { | |
let subject: MySubject<Int, Error> = .init(1) | |
let cancelable = subject.sink(receiveCompletion: { (completion) in | |
print(completion) | |
}, receiveValue: { (value) in | |
// 1回だけ実行される | |
// completionの後は呼ばれない | |
value | |
return | |
}) | |
subject.send(completion: .finished) | |
// completionしていてもvalueは取れる | |
subject.value // => 1 | |
subject.send(2) | |
// completionしているとsendは無視される | |
subject.value // => 1 | |
subject.value = 3 | |
// completionしていてもvalueは有効。 | |
// valueはCombine.SubjectではなくCurrentValueSubjectの機能だから? | |
subject.value // => 3 | |
} | |
do { | |
let subject1: MySubject<Int, Error> = .init(1) | |
let subject2: MySubject<Int, Error> = .init(100) | |
subject1.value // => 1 | |
subject2.value // => 100 | |
let cancelable = subject1.subscribe(subject2) | |
// currentValueSubjectはsubscribeした瞬間現在の値が流れるので、subject1,2は同じ値になる | |
subject1.value // => 1 | |
subject2.value // => 1 | |
subject1.send(2) | |
// 上流であるsubject1に値を流すとsubject2に値が流れる | |
subject1.value // => 2 | |
subject2.value // => 2 | |
subject2.send(200) | |
// 下流であるsubject2に値を流してもsubject1には影響しない | |
subject1.value // => 2 | |
subject2.value // => 200 | |
} | |
do { | |
let subject1: MySubject<Int, Error> = .init(1) | |
let subject2: MySubject<Int, Error> = .init(100) | |
let cancelable = subject1.subscribe(subject2) | |
subject1.value // => 1 | |
subject2.value // => 1 | |
cancelable.cancel() | |
subject1.send(2) | |
// cancelすればsubject1からsubject2に値は流れなくなる | |
subject1.value // => 2 | |
subject2.value // => 1 | |
} | |
do { | |
let subject1: MySubject<Int, Error> = .init(1) | |
let subject2: MySubject<Int, Error> = .init(100) | |
let cancelable = subject1.subscribe(subject2) | |
subject1.value // => 1 | |
subject2.value // => 1 | |
subject1.send(completion: .finished) | |
// subject1を完了するとsubject2も完了する | |
// subject2へのsendが無効になる | |
subject2.send(200) | |
subject1.value // => 1 | |
subject2.value // => 1 | |
} | |
do { | |
let subject1: MySubject<Int, Error> = .init(1) | |
let subject2: MySubject<Int, Error> = .init(100) | |
let cancelable = subject1.subscribe(subject2) | |
subject1.value // => 1 | |
subject2.value // => 1 | |
cancelable.cancel() | |
subject1.send(completion: .finished) | |
// cancelすればcompletionの流れない | |
subject2.send(200) | |
subject1.value // => 1 | |
subject2.value // => 200 | |
} | |
do { | |
let subject1: MySubject<Int, Error> = .init(1) | |
let subject2: MySubject<Int, Error> = .init(100) | |
let cancellable = subject1.subscribe(subject2) | |
subject1.value // => 1 | |
subject2.value // => 1 | |
subject2.send(completion: .finished) | |
subject1.send(2) | |
// subject2が完了しているので、subject1からsubject2に値を流す設定でもsubject2には影響しない | |
subject1.value // => 2 | |
subject2.value // => 1 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment