Created
June 9, 2021 14:05
-
-
Save simme/958cd9beb545b1d41db2065b337fa732 to your computer and use it in GitHub Desktop.
CloudKit publishers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
extension Error { | |
/// `true` if the error represents a CloudKit conflict. | |
var isCloudKitConflictError: Bool { | |
(self as? CKError).map { $0.code == CKError.Code.serverRecordChanged } ?? false | |
} | |
/// `true` if the error represents a CloudKit zone deleted error. | |
var isCloudKitZoneDeletedError: Bool { | |
(self as? CKError).map { [.zoneNotFound, .userDeletedZone].contains($0.code) } ?? false | |
} | |
/// `true` if the error represents a CloudKit token expired error. | |
var isCloudKitTokenExpiredError: Bool { | |
(self as? CKError).map { $0.code == .changeTokenExpired } ?? false | |
} | |
/// Attempt to resolve a conflict. | |
/// | |
/// - Parameter resolver: A closure that takes two records and has to reconcile their differences. | |
/// - Parameter client: The client's record. | |
/// - Parameter server: The server's record. | |
/// | |
/// - Returns: A new record with the conflicts resolved, or `nil` if the conflict could not be resolved. | |
func resolveConflict(with resolver: (_ client: CKRecord, _ server: CKRecord) -> CKRecord?) -> CKRecord? { | |
guard let error = self as? CKError, isCloudKitConflictError else { return nil } | |
guard | |
let clientRecord = error.userInfo[CKRecordChangedErrorClientRecordKey] as? CKRecord, | |
let serverRecord = error.userInfo[CKRecordChangedErrorServerRecordKey] as? CKRecord | |
else { return nil } | |
return resolver(clientRecord, serverRecord) | |
} | |
/// Returns the retry delay if the error represents something that can be retried. | |
/// | |
/// - Returns: The retry delay in seconds. | |
func delayIfRetryPossible() -> Double? { | |
guard let error = self as? CKError else { return nil } | |
return error.retryAfterSeconds | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
import Combine | |
/// Fetches the user's CloudKit Account status. | |
/// | |
/// - Parameter container: The container to check the status in. | |
/// | |
/// - Returns: A deferred future that resolves to the user's CloudKit Account status. | |
func getAccountStatus(for container: CKContainer) -> AnyPublisher<CKAccountStatus, Error> { | |
Deferred { | |
Future { resolve in | |
container.accountStatus { status, error in | |
if let error = error { | |
resolve(.failure(error)) | |
} else { | |
resolve(.success(status)) | |
} | |
} | |
} | |
}.eraseToAnyPublisher() | |
} | |
extension CKAccountStatus { | |
/// `true` if the user has an active CloudKit Account | |
var hasAccount: Bool { self == .available } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
import Combine | |
/// A publisher that wraps a `FetchDatabaseChangesPublisher` and emits events as the operation completes. | |
/// | |
/// Errors are automatically retried if possible. Resetting the change token in case it expired is also automatically | |
/// handled. Because emitting errors fails a publisher all errors are posted as actions. There may still be running | |
/// operations, even if an error occurs. | |
/// | |
/// The publisher emits a completion event once all running operations have finished. | |
struct FetchDatabaseChangesPublisher: Publisher { | |
// MARK: Types | |
/// The events emitted by the publisher. | |
enum Action { | |
case recordZoneChanged(CKRecordZone.ID) | |
case recordZoneDeleted(CKRecordZone.ID) | |
case recordZonePurged(CKRecordZone.ID) | |
case changeTokenUpdated(CKServerChangeToken) | |
case unrecoverableError(Error) | |
} | |
// MARK: Properties | |
/// The CloudKit database to perform the operatoin on. | |
private let database: CKDatabase | |
/// A queue for the operation. | |
private let queue: OperationQueue | |
/// A dictionary of zones and their respective change tokens. | |
private let changeToken: CKServerChangeToken? | |
// MARK: Initialization | |
/// Creates a new `FetchDatabaseChangesPublisher`. | |
/// | |
/// - Parameter database: The cloud kit database to run the operation on. | |
/// - Parameter queue: The operation queue responsible for executing the operation. | |
/// - Parameter changeToken: Change token for the database. | |
/// | |
/// - Returns: A new `FetchRecordZoneChangesPublisher`. | |
init(in database: CKDatabase, on queue: OperationQueue, changeToken: CKServerChangeToken?) { | |
self.database = database | |
self.queue = queue | |
self.changeToken = changeToken | |
} | |
// MARK: Publisher Implementation | |
func receive<S>(subscriber: S) where S: Subscriber, Never == S.Failure, Action == S.Input { | |
let subscription = Subscription( | |
subscriber: subscriber, | |
changeToken: changeToken, | |
in: database, | |
on: queue | |
) | |
subscriber.receive(subscription: subscription) | |
} | |
typealias Output = Action | |
typealias Failure = Never | |
} | |
// MARK: - Subscription | |
private extension FetchDatabaseChangesPublisher { | |
/// The subscription wraps the actual operation execution and emits actions to its subscriber. | |
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure { | |
// MARK: Properties | |
/// The active subscriber receiving input, if any. | |
private var subscriber: S? | |
/// The operation queue to execute the operations on. | |
private let queue: OperationQueue | |
/// The cloud kit database to run the operations against. | |
private let database: CKDatabase | |
/// All in-flight operations. | |
private var operations: [CKFetchDatabaseChangesOperation] = [] | |
/// The change token for the database. | |
private var changeToken: CKServerChangeToken? | |
/// `true` if all changes should be fetched. The consumer is responsible for kicking off more operations if | |
/// necessary to collect all data. | |
private let fetchAllChanges: Bool | |
/// Internal state, makes sure we only start one operation initially. | |
private var didStart: Bool = false | |
// MARK: Initialization | |
/// Creates a new `FetchRecordZoneChangesPublisher.Subscription`. | |
/// | |
/// - Parameter subscriber: The subscriber to notify. | |
/// - Parameter changeToken: The change token for the database. | |
/// - Parameter fetchAllChanges: `true` if all changes should be fetched. The consumer is responsible for kicking | |
/// off more operations if necessary to collect all data. | |
/// - Parameter database: The CloudKit database to perform the operation on. | |
/// - Parameter queue: An operation queue to run the operation. | |
/// | |
/// - Returns: A new `FetchRecordZoneChangesPublisher.Subscriber`. | |
init( | |
subscriber: S, | |
changeToken: CKServerChangeToken?, | |
fetchAllChanges: Bool = true, | |
in database: CKDatabase, | |
on queue: OperationQueue | |
) { | |
self.subscriber = subscriber | |
self.database = database | |
self.changeToken = changeToken | |
self.fetchAllChanges = fetchAllChanges | |
self.queue = queue | |
} | |
/// Configures an operation and sets up the callbacks to send events. | |
/// | |
/// - Parameter changeToken: The previous change token, if any. | |
/// | |
/// - Returns: A new `CKFetchDatabaseChangesOperation`. | |
private func configureOperation( | |
changeToken: CKServerChangeToken? = nil | |
) -> CKFetchDatabaseChangesOperation { | |
let operation = CKFetchDatabaseChangesOperation(previousServerChangeToken: changeToken) | |
operation.fetchAllChanges = fetchAllChanges | |
operation.database = database | |
operation.qualityOfService = .userInitiated | |
operation.resultsLimit = 0 | |
operation.recordZoneWithIDChangedBlock = { [weak self] in _ = self?.subscriber?.receive(.recordZoneChanged($0)) } | |
operation.recordZoneWithIDWasDeletedBlock = { [weak self] in _ = self?.subscriber?.receive(.recordZoneDeleted($0)) } | |
operation.recordZoneWithIDWasPurgedBlock = { [weak self] in _ = self?.subscriber?.receive(.recordZonePurged($0)) } | |
operation.changeTokenUpdatedBlock = { [weak self] in _ = self?.subscriber?.receive(.changeTokenUpdated($0)) } | |
operation.fetchDatabaseChangesCompletionBlock = { [weak self] token, moreComing, error in | |
if let token = token { | |
self?.changeToken = token | |
_ = self?.subscriber?.receive(.changeTokenUpdated(token)) | |
} | |
if let error = error { | |
self?.handleError(error) | |
} else { | |
if moreComing, let operation = self?.configureOperation(changeToken: token) { | |
self?.queue.addOperation(operation) | |
} else { | |
self?.subscriber?.receive(completion: .finished) | |
} | |
} | |
} | |
self.operations.append(operation) | |
return operation | |
} | |
/// Handle CloudKit errors. | |
/// | |
/// - Parameter error: The error to handle. | |
private func handleError(_ error: Error) { | |
if error.isCloudKitTokenExpiredError { | |
if error.isCloudKitTokenExpiredError { | |
let newOperation = configureOperation(changeToken: nil) | |
queue.addOperation(newOperation) | |
} else if let retryDelay = error.delayIfRetryPossible() { | |
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in | |
guard let newOperation = self?.configureOperation(changeToken: self?.changeToken) else { return } | |
self?.queue.addOperation(newOperation) | |
} | |
} else { | |
_ = subscriber?.receive(.unrecoverableError(error)) | |
} | |
} | |
} | |
} | |
} | |
// MARK: - | |
extension FetchDatabaseChangesPublisher.Subscription: Cancellable { | |
func cancel() { | |
subscriber = nil | |
for operation in operations { | |
operation.cancel() | |
} | |
} | |
} | |
extension FetchDatabaseChangesPublisher.Subscription: Subscription { | |
func request(_ demand: Subscribers.Demand) { | |
guard subscriber != nil else { return } | |
if demand > 0 && !didStart { | |
let operation = configureOperation(changeToken: changeToken) | |
queue.addOperation(operation) | |
didStart = true | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
import Combine | |
/// A publisher that wraps a `CKFetchRecordZoneChangesOperation` and emits events as the operation completes. | |
/// | |
/// The `FetchRecordZoneChangesPublisher` fetches changes from the given record zones. New and deleted records are | |
/// posted individually via the `.recordChanged` and `.recordDeleted` actions. | |
/// | |
/// Errors are automatically retried if possible. Resetting the change token in case it expired is also automatically | |
/// handled. Because emitting errors fails a publisher all errors are posted as actions. There may still be running | |
/// operations, even if an error occurs. | |
/// | |
/// The publisher emits a completion event once all running operations have finished. | |
struct FetchRecordZoneChangesPublisher: Publisher { | |
// MARK: Types | |
/// The events emitted by the publisher. | |
enum Action { | |
/// Posted when the record zone change token is updated. | |
case recordZoneChangeTokenUpdated(zoneID: CKRecordZone.ID, token: CKServerChangeToken?, clientToken: Data?) | |
/// Posted when an updated record is received from the server. | |
case recordChanged(CKRecord) | |
/// Posted when a record has been deleted on the server. | |
case recordDeleted(CKRecord.ID, CKRecord.RecordType) | |
/// Posted when the fetch is complete for a specific zone. | |
case zoneFetchComplete(CKRecordZone.ID, CKServerChangeToken?, Data?, Bool) | |
/// An error occured and changes for the associated zone will be fetched again. | |
case retryingZone(CKRecordZone.ID) | |
/// An unrecoverable error occured. | |
case unrecoverableError(CKRecordZone.ID?, Error) | |
} | |
// MARK: Properties | |
/// The CloudKit database to perform the operatoin on. | |
private let database: CKDatabase | |
/// A queue for the operation. | |
private let queue: OperationQueue | |
/// A dictionary of zones and their respective change tokens. | |
private let zoneTokens: [CKRecordZone.ID: CKServerChangeToken?] | |
// MARK: Initialization | |
/// Creates a new `FetchRecordZoneChangesPublisher`. | |
/// | |
/// - Parameter database: The cloud kit database to run the operation on. | |
/// - Parameter queue: The operation queue responsible for executing the operation. | |
/// - Parameter zoneTokens: A dictionary of zones and their respective change tokens. | |
/// | |
/// - Returns: A new `FetchRecordZoneChangesPublisher`. | |
init(in database: CKDatabase, on queue: OperationQueue, zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]) { | |
self.database = database | |
self.queue = queue | |
self.zoneTokens = zoneTokens | |
} | |
// MARK: Publisher Implementation | |
func receive<S>(subscriber: S) where S: Subscriber, Never == S.Failure, Action == S.Input { | |
let subscription = Subscription( | |
subscriber: subscriber, | |
zoneTokens: zoneTokens, | |
in: database, | |
on: queue | |
) | |
subscriber.receive(subscription: subscription) | |
} | |
typealias Output = Action | |
typealias Failure = Never | |
} | |
// MARK: - Subscription | |
private extension FetchRecordZoneChangesPublisher { | |
/// The subscription wraps the actual operation execution and emits actions to its subscriber. | |
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure { | |
// MARK: Properties | |
/// The active subscriber receiving input, if any. | |
private var subscriber: S? | |
/// The operation queue to execute the operations on. | |
private let queue: OperationQueue | |
/// The cloud kit database to run the operations against. | |
private let database: CKDatabase | |
/// All in-flight operations. | |
private var operations: [CKFetchRecordZoneChangesOperation] = [] | |
/// A list of zones and their respective tokens. May change as zones are retried. | |
private var zoneTokens: [CKRecordZone.ID: CKServerChangeToken?] | |
/// `true` if all changes should be fetched. The consumer is responsible for kicking off more operations if | |
/// necessary to collect all data. | |
private let fetchAllChanges: Bool | |
/// Internal state, makes sure we only start one operation initially. | |
private var didStart: Bool = false | |
// MARK: Initialization | |
/// Creates a new `FetchRecordZoneChangesPublisher.Subscription`. | |
/// | |
/// - Parameter subscriber: The subscriber to notify. | |
/// - Parameter zoneTkens: A list of zones and their respective tokens. May change as zones are retried. | |
/// - Parameter fetchAllChanges: `true` if all changes should be fetched. The consumer is responsible for kicking | |
/// off more operations if necessary to collect all data. | |
/// - Parameter database: The CloudKit database to perform the operation on. | |
/// - Parameter queue: An operation queue to run the operation. | |
/// | |
/// - Returns: A new `FetchRecordZoneChangesPublisher.Subscriber`. | |
init( | |
subscriber: S, | |
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?], | |
fetchAllChanges: Bool = true, | |
in database: CKDatabase, | |
on queue: OperationQueue | |
) { | |
self.subscriber = subscriber | |
self.database = database | |
self.zoneTokens = zoneTokens | |
self.fetchAllChanges = fetchAllChanges | |
self.queue = queue | |
} | |
/// Configures an operation and sets up the callbacks to send events. | |
/// | |
/// - Parameter zoneTokens: A list of zones and their respective tokens. | |
/// | |
/// - Returns: A new `CKFetchRecordZoneChangesOperation`. | |
private func configureOperation( | |
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?] | |
) -> CKFetchRecordZoneChangesOperation { | |
let configurations = Dictionary(uniqueKeysWithValues: zoneTokens.map { id, token in | |
(id, CKFetchRecordZoneChangesOperation.ZoneConfiguration( | |
previousServerChangeToken: token, | |
resultsLimit: nil, | |
desiredKeys: nil | |
)) | |
}) | |
let operation = CKFetchRecordZoneChangesOperation( | |
recordZoneIDs: Array(zoneTokens.keys), | |
configurationsByRecordZoneID: configurations | |
) | |
operation.fetchAllChanges = fetchAllChanges | |
operation.database = database | |
operation.qualityOfService = .userInitiated | |
operation.recordZoneChangeTokensUpdatedBlock = { [weak self] zoneID, token, clientToken in | |
self?.zoneTokens[zoneID] = token | |
_ = self?.subscriber?.receive(.recordZoneChangeTokenUpdated( | |
zoneID: zoneID, | |
token: token, | |
clientToken: clientToken | |
)) | |
} | |
operation.recordChangedBlock = { [weak self] record in | |
_ = self?.subscriber?.receive(.recordChanged(record)) | |
} | |
operation.recordWithIDWasDeletedBlock = { [weak self] id, type in | |
_ = self?.subscriber?.receive(.recordDeleted(id, type)) | |
} | |
operation.recordZoneFetchCompletionBlock = { [weak self] zoneID, changeToken, clientToken, moreComing, error in | |
if let error = error { | |
self?.handleError(error, for: zoneID) | |
} else { | |
_ = self?.subscriber?.receive(.zoneFetchComplete(zoneID, changeToken, clientToken, moreComing)) | |
} | |
} | |
operation.fetchRecordZoneChangesCompletionBlock = { [weak self] error in | |
guard let strongSelf = self else { return } | |
_ = strongSelf.operations.firstIndex(of: operation).map { strongSelf.operations.remove(at: $0) } | |
if let error = error { | |
strongSelf.handleError(error, for: nil) | |
} else { | |
if strongSelf.operations.isEmpty { | |
self?.subscriber?.receive(completion: .finished) | |
} | |
} | |
} | |
self.operations.append(operation) | |
return operation | |
} | |
/// Handle CloudKit errors. | |
/// | |
/// - Parameter error: The error to handle. | |
/// - Parameter zoneID: The ID of the zone in which the error occured. | |
private func handleError(_ error: Error, for zoneID: CKRecordZone.ID?) { | |
if error.isCloudKitTokenExpiredError, let zoneID = zoneID { | |
_ = subscriber?.receive(.recordZoneChangeTokenUpdated(zoneID: zoneID, token: nil, clientToken: nil)) | |
_ = subscriber?.receive(.retryingZone(zoneID)) | |
let newOperation = configureOperation(zoneTokens: [zoneID: nil]) | |
queue.addOperation(newOperation) | |
} else if let retryDelay = error.delayIfRetryPossible(), let zoneID = zoneID { | |
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in | |
let token = self?.zoneTokens[zoneID] ?? nil | |
guard let newOperation = self?.configureOperation(zoneTokens: [zoneID: token]) else { return } | |
self?.queue.addOperation(newOperation) | |
} | |
} else { | |
_ = subscriber?.receive(.unrecoverableError(zoneID, error)) | |
} | |
} | |
} | |
} | |
// MARK: - | |
extension FetchRecordZoneChangesPublisher.Subscription: Cancellable { | |
func cancel() { | |
subscriber = nil | |
for operation in operations { | |
operation.cancel() | |
} | |
} | |
} | |
extension FetchRecordZoneChangesPublisher.Subscription: Subscription { | |
func request(_ demand: Subscribers.Demand) { | |
guard subscriber != nil else { return } | |
if demand > 0 && !didStart { | |
let operation = configureOperation(zoneTokens: zoneTokens) | |
queue.addOperation(operation) | |
didStart = true | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
import Combine | |
/// The `ModifyRecordsPublisher` takes an array of records to save and or delete and syncs those with iCloud. | |
struct ModifyRecordsPublisher: Publisher { | |
// MARK: Types | |
/// The events emitted by the publisher. | |
enum Action { | |
/// Upload progress was updated for a record. | |
case didUpdateProgress(CKRecord, Double) | |
/// A conflict failed to resolve. | |
case failedToResolveConflict | |
/// A record failed to save. | |
case failedToSaveRecord(CKRecord, Error) | |
/// A record was saved. | |
case recordSaved(CKRecord) | |
/// Records where deleted. | |
case recordsDeleted([CKRecord.ID]) | |
/// Sync operation completed. | |
case syncCompleted | |
/// An unknown error occured. | |
case unknownError(Error) | |
/// Upload limit exceeded. Need to batch items. | |
case limitExceeded | |
} | |
// MARK: Properties | |
/// The CloudKit database to perform the operatoin on. | |
private var database: CKDatabase | |
/// An array of records to save. | |
private var recordsToSave: [CKRecord]? | |
/// An array of record IDs to delete. | |
private var recordIDsToDelete: [CKRecord.ID]? | |
/// A queue for the operation. | |
private let queue: OperationQueue | |
/// A closure that can resolve a conflict between two records. | |
private let resolver: (_ client: CKRecord, _ server: CKRecord) -> CKRecord? | |
// MARK: Initalization | |
/// Creates a new `ModifyRecordsPublisher`. | |
/// | |
/// - Parameter recordsToSave: An array of records to save. | |
/// - Parameter recordIDsToDelete: An array of record IDs to delete. | |
/// - Parameter database: The CloudKit database to perform the operatoin on. | |
/// - Parameter queue: A queue for the operation. | |
/// - Parameter resolver: A closure that can resolve a conflict between two records. | |
/// - Parameter client: The client's version of the record. | |
/// - Parameter server: The server's version of the record. | |
/// | |
/// - Returns: A new `ModifyRecordsPublisher` | |
init( | |
recordsToSave: [CKRecord]?, | |
recordIDsToDelete: [CKRecord.ID]?, | |
database: CKDatabase, | |
queue: OperationQueue, | |
resolver: @escaping (_ client: CKRecord, _ server: CKRecord) -> CKRecord? | |
) { | |
self.database = database | |
self.queue = queue | |
self.recordsToSave = recordsToSave | |
self.recordIDsToDelete = recordIDsToDelete | |
self.resolver = resolver | |
} | |
// MARK: Publisher | |
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = Subscription( | |
subscriber: subscriber, | |
recordsToSave: recordsToSave, | |
recordIDsToDelete: recordIDsToDelete, | |
database: database, | |
queue: queue, | |
resolver: resolver | |
) | |
subscriber.receive(subscription: subscription) | |
} | |
typealias Output = Action | |
typealias Failure = Never | |
} | |
// MARK: - Subscription | |
extension ModifyRecordsPublisher { | |
/// The subscription that wraps the actual operation execution and emits actions to its subscriber. | |
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure { | |
/// The subscriber to notify of events. | |
private var subscriber: S? | |
/// The CloudKit database to perform the operatoin on. | |
private var database: CKDatabase | |
/// An array of records to save. | |
private var recordsToSave: [CKRecord]? | |
/// An array of record IDs to delete. | |
private var recordIDsToDelete: [CKRecord.ID]? | |
/// A queue for the operation. | |
private let queue: OperationQueue | |
/// A closure that can resolve a conflict between two records. | |
private let resolver: (_ client: CKRecord, _ server: CKRecord) -> CKRecord? | |
/// Any in-flight operations. | |
private var operations: [CKModifyRecordsOperation] = [] | |
// MARK: Initialization | |
/// Creates a new `ModifyRecordsPublisher.Subscription`. | |
/// | |
/// - Parameter subscriber: The subscriber to notify. | |
/// - Parameter recordsToSave: An array of records to save. | |
/// - Parameter recordIDsToDelete: An array of record IDs to delete. | |
/// - Parameter database: The CloudKit database to perform the operatoin on. | |
/// - Parameter queue: A queue for the operation. | |
/// - Parameter resolver: A closure that can resolve a conflict between two records. | |
/// - Parameter client: The client's version of the record. | |
/// - Parameter server: The server's version of the record. | |
/// | |
/// - Returns: A new `ModifyRecordsPublisher.Subscription`. | |
init( | |
subscriber: S, | |
recordsToSave: [CKRecord]?, | |
recordIDsToDelete: [CKRecord.ID]?, | |
database: CKDatabase, | |
queue: OperationQueue, | |
resolver: @escaping (_ client: CKRecord, _ server: CKRecord) -> CKRecord? | |
) { | |
self.subscriber = subscriber | |
self.database = database | |
self.queue = queue | |
self.recordsToSave = recordsToSave | |
self.recordIDsToDelete = recordIDsToDelete | |
self.resolver = resolver | |
} | |
// MARK: Operation | |
/// Creates and configures an operation. | |
/// | |
/// Might be called multiple times to retry resolved conflicts or retry other errors. | |
/// | |
/// - Parameter recordsToSave: An array of records to save. | |
/// - Parameter recordIDsToDelete: An array of record IDs to delete. | |
/// | |
/// - Returns: A `CKModifyRecordsOperation`. | |
private func configureOperation( | |
recordsToSave: [CKRecord]?, | |
recordIDsToDelete: [CKRecord.ID]? | |
) -> CKModifyRecordsOperation { | |
let operation = CKModifyRecordsOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete) | |
operation.savePolicy = .changedKeys | |
operation.qualityOfService = .userInitiated | |
operation.database = database | |
operation.perRecordProgressBlock = { [weak self] record, progress in | |
_ = self?.subscriber?.receive(.didUpdateProgress(record, progress)) | |
} | |
operation.perRecordCompletionBlock = { [weak self] record, error in | |
guard let error = error else { | |
_ = self?.subscriber?.receive(.recordSaved(record)) | |
return | |
} | |
self?.handleRecordError(record: record, error: error) | |
} | |
operation.modifyRecordsCompletionBlock = { [weak self] _, deleted, error in | |
guard let strongSelf = self else { return } | |
_ = strongSelf.operations.firstIndex(of: operation).map { strongSelf.operations.remove(at: $0) } | |
if let error = error { | |
self?.handleUploadError(error: error, recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete) | |
} else if let records = strongSelf.recordsToSave, !records.isEmpty { | |
let newOperation = self?.configureOperation(recordsToSave: records, recordIDsToDelete: nil) | |
strongSelf.recordsToSave = nil | |
newOperation.map { | |
self?.queue.addOperation($0) | |
self?.operations.append($0) | |
} | |
} else { | |
_ = deleted.map { self?.subscriber?.receive(.recordsDeleted($0)) } | |
if strongSelf.operations.isEmpty { | |
_ = self?.subscriber?.receive(.syncCompleted) | |
} | |
} | |
} | |
return operation | |
} | |
/// Handle a single record error. | |
/// | |
/// - Parameter record: The record for which the error occured. | |
/// - Parameter error: The error that occured. | |
private func handleRecordError(record: CKRecord, error: Error) { | |
// We only care about conflict errors here. Other errors like failed uploads are handled by the | |
// completion block. | |
guard error.isCloudKitConflictError else { | |
_ = subscriber?.receive(.failedToSaveRecord(record, error)) | |
return | |
} | |
// Attempt to resolve conflict. | |
guard let resolved = error.resolveConflict(with: resolver) else { | |
_ = subscriber?.receive(.failedToResolveConflict) | |
return | |
} | |
// Queue up the resolved record to be saved. | |
recordsToSave?.append(resolved) | |
} | |
/// Handle errors. | |
/// | |
/// - Parameter error: The error to handle. | |
/// - Parameter recordsToSave: An array of the records that failed to save. | |
/// - Parameter recordsToDelete: An array of the records that could not be deleted. | |
private func handleUploadError(error: Error, recordsToSave: [CKRecord]?, recordIDsToDelete: [CKRecord.ID]?) { | |
guard let ckError = error as? CKError else { | |
_ = subscriber?.receive(.unknownError(error)) | |
subscriber?.receive(completion: .finished) | |
return | |
} | |
if ckError.code == .limitExceeded { | |
_ = subscriber?.receive(.limitExceeded) | |
} else if let retryDelay = ckError.retryAfterSeconds { | |
let newOperation = configureOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete) | |
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in | |
self?.queue.addOperation(newOperation) | |
} | |
} | |
} | |
} | |
} | |
// MARK: - | |
extension ModifyRecordsPublisher.Subscription: Cancellable { | |
func cancel() { | |
subscriber = nil | |
for operation in operations { | |
operation.cancel() | |
} | |
} | |
} | |
extension ModifyRecordsPublisher.Subscription: Subscription { | |
func request(_ demand: Subscribers.Demand) { | |
guard subscriber != nil, operations.count == 0, demand > 0 else { return } | |
let operation = configureOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete) | |
recordsToSave = nil | |
recordIDsToDelete = nil | |
queue.addOperation(operation) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
import Combine | |
/// A publisher that wraps a `CKModifyRecordZonesOperation` and emits events as the operation completes. | |
public struct ModifyRecordZonesPublisher: Publisher { | |
// MARK: Types | |
/// The events emitted by the publisher. | |
public enum Action: Equatable { | |
/// The operation completed. Associated values contains the zones that were created and deleted. | |
case operationCompleted(created: [CKRecordZone]?, deleted: [CKRecordZone.ID]?) | |
} | |
// MARK: Properties | |
/// The zones to create. | |
private let zones: [CKRecordZone] | |
/// The IDs of the zones to delete. | |
private let delete: [CKRecordZone.ID] | |
/// The database to create the zone in | |
private let database: CKDatabase | |
/// A queue for the operation. If `nil` the database queue will be used. | |
private let queue: OperationQueue? | |
// MARK: Initialization | |
/// Creates a new `ModifyRecordsPublisher`. | |
/// | |
/// - Parameter zones: The record zones to create. | |
/// - Parameter delete: The record zones to delete. | |
/// - Parameter database: The database to create the zone in. | |
/// - Parameter queue: An optional operation queue to run the operations. | |
/// | |
/// - Returns: A new `ModifyRecordZonesPublisher`. | |
public init( | |
create: [CKRecordZone], | |
delete: [CKRecordZone.ID] = [], | |
in database: CKDatabase, | |
on queue: OperationQueue? = nil | |
) { | |
self.zones = create | |
self.delete = delete | |
self.database = database | |
self.queue = queue | |
} | |
// MARK: Publisher Implementation | |
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = Subscription( | |
subscriber: subscriber, | |
zones: zones, | |
toDelete: delete, | |
in: database, | |
on: queue | |
) | |
subscriber.receive(subscription: subscription) | |
} | |
public typealias Output = Action | |
public typealias Failure = CloudKitClientError | |
} | |
// MARK: - Subscription | |
private extension ModifyRecordZonesPublisher { | |
/// The subscription that wraps the actual operation execution and emits actions to its subscriber. | |
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure { | |
// MARK: Properties | |
/// The subscriber to notify of events. | |
private var subscriber: S? | |
/// The database to create the zone in | |
private let database: CKDatabase | |
/// An operation queue to run the operation. | |
private let queue: OperationQueue? | |
/// The operation when in flight. | |
private var operation: CKModifyRecordZonesOperation? | |
/// The zones to create. | |
private let zones: [CKRecordZone] | |
/// The zone IDs to delete. | |
private let delete: [CKRecordZone.ID] | |
// MARK: Initialization | |
/// Creates a new `ModifyRecordZonesPublisher.Subscription`. | |
/// | |
/// - Parameter subscriber: The subscriber to notify. | |
/// - Parameter zones: The zones to create. | |
/// - Parameter delete: The zone IDs to delete. | |
/// - Parameter database: The database to create the zone in. | |
/// - Parameter queue: An operation queue to run the operation. | |
/// | |
/// - Returns: A new `RecordSubscriptionPublisher.Subscriber`. | |
init( | |
subscriber: S, | |
zones: [CKRecordZone], | |
toDelete delete: [CKRecordZone.ID], | |
in database: CKDatabase, | |
on queue: OperationQueue? | |
) { | |
self.subscriber = subscriber | |
self.database = database | |
self.queue = queue | |
self.zones = zones | |
self.delete = delete | |
} | |
// MARK: Operation | |
/// Configures the operation and sets up the callbacks to send events. | |
private func configureOperation() { | |
let operation = CKModifyRecordZonesOperation(recordZonesToSave: zones, recordZoneIDsToDelete: delete) | |
operation.database = database | |
operation.qualityOfService = .utility | |
operation.modifyRecordZonesCompletionBlock = { [weak self] zones, deleted, error in | |
if let error = error as? CKError { | |
self?.handleError(error) | |
} else if let error = error.map({ $0 as NSError }) { | |
self?.subscriber?.receive(completion: .failure(.unknownError(error))) | |
} else { | |
_ = self?.subscriber?.receive(.operationCompleted(created: zones, deleted: deleted)) | |
self?.subscriber?.receive(completion: .finished) | |
} | |
} | |
self.operation = operation | |
if let queue = queue { | |
queue.addOperation(operation) | |
} else { | |
database.add(operation) | |
} | |
} | |
/// Handle CloudKit errors. | |
/// | |
/// - Parameter error: The error to handle. | |
private func handleError(_ error: CKError) { | |
if let retryDelay = error.delayIfRetryPossible(), let queue = queue { | |
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in | |
guard let strongSelf = self else { return } | |
strongSelf.configureOperation() | |
} | |
} else { | |
subscriber?.receive(completion: .failure(.cloudKitError(error))) | |
} | |
} | |
} | |
} | |
// MARK: - | |
extension ModifyRecordZonesPublisher.Subscription: Cancellable { | |
func cancel() { | |
subscriber = nil | |
operation?.cancel() | |
} | |
} | |
extension ModifyRecordZonesPublisher.Subscription: Subscription { | |
func request(_ demand: Subscribers.Demand) { | |
guard subscriber != nil, operation == nil, demand > 0 else { return } | |
configureOperation() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
import Combine | |
/// A `RecordSubscriptionPublisher` wraps a `CKModifySubscriptionsOperation` and emits actions whenever the | |
/// operation returns a callback. | |
public struct RecordSubscriptionPublisher: Publisher { | |
// MARK: Types | |
/// The events emitted by the publisher. | |
public enum Action: Equatable { | |
/// The subscription was saved. | |
case savedSubscription(CKRecordZone.ID?, CKSubscription.ID) | |
} | |
// MARK: Properties | |
/// The CloudKit database to perform the operation on. | |
private let database: CKDatabase | |
/// An operation queue to run the operation. | |
private let queue: OperationQueue | |
/// The id of the zone to create the subscription for. | |
private let zoneID: CKRecordZone.ID? | |
/// The id of the subscription to create. | |
private let subscriptionID: CKSubscription.ID | |
// MARK: Initialization | |
/// Creates a new `RecordSubscriptionPublisher`. | |
/// | |
/// - Parameter database: The CloudKit database to perform the operation on. | |
/// - Parameter queue: An operation queue to run the operation. | |
/// - Parameter zoneID: The id of the zone to create the subscription for. | |
/// - Parameter subscriptionID: The id of the subscription to create. | |
/// | |
/// - Returns: A new `RecordSubscriptionPublisher`. | |
public init( | |
in database: CKDatabase, | |
on queue: OperationQueue, | |
zoneID: CKRecordZone.ID? = nil, | |
subscriptionID: CKSubscription.ID | |
) { | |
self.database = database | |
self.queue = queue | |
self.zoneID = zoneID | |
self.subscriptionID = subscriptionID | |
} | |
// MARK: Publisher Implementation | |
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = Subscription( | |
subscriber: subscriber, | |
database: database, | |
queue: queue, | |
zoneID: zoneID, | |
subscriptionID: subscriptionID | |
) | |
subscriber.receive(subscription: subscription) | |
} | |
public typealias Output = Action | |
public typealias Failure = CloudKitClientError | |
} | |
// MARK: - Subscription | |
private extension RecordSubscriptionPublisher { | |
/// The subscription that wraps the actual operation execution and emits actions to its subscriber. | |
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure { | |
// MARK: Properties | |
/// The subscriber to notify of events. | |
private var subscriber: S? | |
/// The operation when in flight. | |
private var operation: CKModifySubscriptionsOperation? | |
/// The CloudKit database to perform the operation on. | |
private let database: CKDatabase | |
/// An operation queue to run the operation. | |
private let queue: OperationQueue | |
/// The id of the zone to create the subscription for. | |
private let zoneID: CKRecordZone.ID? | |
/// The id of the subscription to create. | |
private let subscriptionID: CKSubscription.ID | |
// MARK: Initialization | |
/// Creates a new `RecordSubscriptionPublisher.Subscription`. | |
/// | |
/// - Parameter subscriber: The subscriber to notify. | |
/// - Parameter database: The CloudKit database to perform the operation on. | |
/// - Parameter queue: An operation queue to run the operation. | |
/// - Parameter zoneID: The id of the zone to create the subscription for. | |
/// - Parameter subscriptionID: The id of the subscription to create. | |
/// | |
/// - Returns: A new `RecordSubscriptionPublisher`. | |
init( | |
subscriber: S, | |
database: CKDatabase, | |
queue: OperationQueue, | |
zoneID: CKRecordZone.ID?, | |
subscriptionID: CKSubscription.ID | |
) { | |
self.subscriber = subscriber | |
self.database = database | |
self.queue = queue | |
self.zoneID = zoneID | |
self.subscriptionID = subscriptionID | |
} | |
// MARK: Operations | |
/// Creates and configures a `CKModifySubscriptionsOperation` to communciate with CloudKit. | |
/// | |
/// - Parameter zoneID: The id of the zone to create the subscription for. | |
/// - Parameter subscriptionID: The id of the subscription to create. | |
private func configureOperation(zoneID: CKRecordZone.ID?, subscriptionID: CKSubscription.ID) { | |
let subscription: CKSubscription | |
if let zoneID = zoneID { | |
subscription = CKRecordZoneSubscription(zoneID: zoneID, subscriptionID: subscriptionID) | |
} else { | |
subscription = CKDatabaseSubscription(subscriptionID: subscriptionID) | |
} | |
let notificationInfo = CKSubscription.NotificationInfo() | |
notificationInfo.shouldSendContentAvailable = true | |
notificationInfo.alertBody = "hej" | |
subscription.notificationInfo = notificationInfo | |
let operation = CKModifySubscriptionsOperation(subscriptionsToSave: [subscription], subscriptionIDsToDelete: nil) | |
operation.database = database | |
operation.qualityOfService = .userInitiated | |
operation.modifySubscriptionsCompletionBlock = { [weak self] _, _, error in | |
if let error = error as? CKError { | |
self?.handleError(error: error) | |
} else if let error = error.map({ $0 as NSError }) { | |
self?.subscriber?.receive(completion: .failure(.unknownError(error))) | |
} else { | |
_ = self?.subscriber?.receive(.savedSubscription(zoneID, subscriptionID)) | |
self?.subscriber?.receive(completion: .finished) | |
} | |
} | |
self.operation = operation | |
queue.addOperation(operation) | |
} | |
/// Handle CloudKit errors. | |
/// | |
/// - Parameter error: The error to handle. | |
private func handleError(error: CKError) { | |
if let retryDelay = error.delayIfRetryPossible() { | |
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in | |
guard let strongSelf = self else { return } | |
strongSelf.configureOperation(zoneID: strongSelf.zoneID, subscriptionID: strongSelf.subscriptionID) | |
} | |
} else { | |
subscriber?.receive(completion: .failure(.cloudKitError(error))) | |
} | |
} | |
} | |
} | |
// MARK: - | |
extension RecordSubscriptionPublisher.Subscription: Cancellable { | |
func cancel() { | |
subscriber = nil | |
operation?.cancel() | |
} | |
} | |
extension RecordSubscriptionPublisher.Subscription: Subscription { | |
func request(_ demand: Subscribers.Demand) { | |
guard subscriber != nil, operation == nil, demand > 0 else { return } | |
configureOperation(zoneID: zoneID, subscriptionID: subscriptionID) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment