Skip to content

Instantly share code, notes, and snippets.

@simme
Created June 9, 2021 14:05
Show Gist options
  • Save simme/958cd9beb545b1d41db2065b337fa732 to your computer and use it in GitHub Desktop.
Save simme/958cd9beb545b1d41db2065b337fa732 to your computer and use it in GitHub Desktop.
CloudKit publishers
import CloudKit
extension Error {
/// `true` if the error represents a CloudKit conflict.
var isCloudKitConflictError: Bool {
(self as? CKError).map { $0.code == CKError.Code.serverRecordChanged } ?? false
}
/// `true` if the error represents a CloudKit zone deleted error.
var isCloudKitZoneDeletedError: Bool {
(self as? CKError).map { [.zoneNotFound, .userDeletedZone].contains($0.code) } ?? false
}
/// `true` if the error represents a CloudKit token expired error.
var isCloudKitTokenExpiredError: Bool {
(self as? CKError).map { $0.code == .changeTokenExpired } ?? false
}
/// Attempt to resolve a conflict.
///
/// - Parameter resolver: A closure that takes two records and has to reconcile their differences.
/// - Parameter client: The client's record.
/// - Parameter server: The server's record.
///
/// - Returns: A new record with the conflicts resolved, or `nil` if the conflict could not be resolved.
func resolveConflict(with resolver: (_ client: CKRecord, _ server: CKRecord) -> CKRecord?) -> CKRecord? {
guard let error = self as? CKError, isCloudKitConflictError else { return nil }
guard
let clientRecord = error.userInfo[CKRecordChangedErrorClientRecordKey] as? CKRecord,
let serverRecord = error.userInfo[CKRecordChangedErrorServerRecordKey] as? CKRecord
else { return nil }
return resolver(clientRecord, serverRecord)
}
/// Returns the retry delay if the error represents something that can be retried.
///
/// - Returns: The retry delay in seconds.
func delayIfRetryPossible() -> Double? {
guard let error = self as? CKError else { return nil }
return error.retryAfterSeconds
}
}
import CloudKit
import Combine
/// Fetches the user's CloudKit Account status.
///
/// - Parameter container: The container to check the status in.
///
/// - Returns: A deferred future that resolves to the user's CloudKit Account status.
func getAccountStatus(for container: CKContainer) -> AnyPublisher<CKAccountStatus, Error> {
Deferred {
Future { resolve in
container.accountStatus { status, error in
if let error = error {
resolve(.failure(error))
} else {
resolve(.success(status))
}
}
}
}.eraseToAnyPublisher()
}
extension CKAccountStatus {
/// `true` if the user has an active CloudKit Account
var hasAccount: Bool { self == .available }
}
import CloudKit
import Combine
/// A publisher that wraps a `FetchDatabaseChangesPublisher` and emits events as the operation completes.
///
/// Errors are automatically retried if possible. Resetting the change token in case it expired is also automatically
/// handled. Because emitting errors fails a publisher all errors are posted as actions. There may still be running
/// operations, even if an error occurs.
///
/// The publisher emits a completion event once all running operations have finished.
struct FetchDatabaseChangesPublisher: Publisher {
// MARK: Types
/// The events emitted by the publisher.
enum Action {
case recordZoneChanged(CKRecordZone.ID)
case recordZoneDeleted(CKRecordZone.ID)
case recordZonePurged(CKRecordZone.ID)
case changeTokenUpdated(CKServerChangeToken)
case unrecoverableError(Error)
}
// MARK: Properties
/// The CloudKit database to perform the operatoin on.
private let database: CKDatabase
/// A queue for the operation.
private let queue: OperationQueue
/// A dictionary of zones and their respective change tokens.
private let changeToken: CKServerChangeToken?
// MARK: Initialization
/// Creates a new `FetchDatabaseChangesPublisher`.
///
/// - Parameter database: The cloud kit database to run the operation on.
/// - Parameter queue: The operation queue responsible for executing the operation.
/// - Parameter changeToken: Change token for the database.
///
/// - Returns: A new `FetchRecordZoneChangesPublisher`.
init(in database: CKDatabase, on queue: OperationQueue, changeToken: CKServerChangeToken?) {
self.database = database
self.queue = queue
self.changeToken = changeToken
}
// MARK: Publisher Implementation
func receive<S>(subscriber: S) where S: Subscriber, Never == S.Failure, Action == S.Input {
let subscription = Subscription(
subscriber: subscriber,
changeToken: changeToken,
in: database,
on: queue
)
subscriber.receive(subscription: subscription)
}
typealias Output = Action
typealias Failure = Never
}
// MARK: - Subscription
private extension FetchDatabaseChangesPublisher {
/// The subscription wraps the actual operation execution and emits actions to its subscriber.
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
// MARK: Properties
/// The active subscriber receiving input, if any.
private var subscriber: S?
/// The operation queue to execute the operations on.
private let queue: OperationQueue
/// The cloud kit database to run the operations against.
private let database: CKDatabase
/// All in-flight operations.
private var operations: [CKFetchDatabaseChangesOperation] = []
/// The change token for the database.
private var changeToken: CKServerChangeToken?
/// `true` if all changes should be fetched. The consumer is responsible for kicking off more operations if
/// necessary to collect all data.
private let fetchAllChanges: Bool
/// Internal state, makes sure we only start one operation initially.
private var didStart: Bool = false
// MARK: Initialization
/// Creates a new `FetchRecordZoneChangesPublisher.Subscription`.
///
/// - Parameter subscriber: The subscriber to notify.
/// - Parameter changeToken: The change token for the database.
/// - Parameter fetchAllChanges: `true` if all changes should be fetched. The consumer is responsible for kicking
/// off more operations if necessary to collect all data.
/// - Parameter database: The CloudKit database to perform the operation on.
/// - Parameter queue: An operation queue to run the operation.
///
/// - Returns: A new `FetchRecordZoneChangesPublisher.Subscriber`.
init(
subscriber: S,
changeToken: CKServerChangeToken?,
fetchAllChanges: Bool = true,
in database: CKDatabase,
on queue: OperationQueue
) {
self.subscriber = subscriber
self.database = database
self.changeToken = changeToken
self.fetchAllChanges = fetchAllChanges
self.queue = queue
}
/// Configures an operation and sets up the callbacks to send events.
///
/// - Parameter changeToken: The previous change token, if any.
///
/// - Returns: A new `CKFetchDatabaseChangesOperation`.
private func configureOperation(
changeToken: CKServerChangeToken? = nil
) -> CKFetchDatabaseChangesOperation {
let operation = CKFetchDatabaseChangesOperation(previousServerChangeToken: changeToken)
operation.fetchAllChanges = fetchAllChanges
operation.database = database
operation.qualityOfService = .userInitiated
operation.resultsLimit = 0
operation.recordZoneWithIDChangedBlock = { [weak self] in _ = self?.subscriber?.receive(.recordZoneChanged($0)) }
operation.recordZoneWithIDWasDeletedBlock = { [weak self] in _ = self?.subscriber?.receive(.recordZoneDeleted($0)) }
operation.recordZoneWithIDWasPurgedBlock = { [weak self] in _ = self?.subscriber?.receive(.recordZonePurged($0)) }
operation.changeTokenUpdatedBlock = { [weak self] in _ = self?.subscriber?.receive(.changeTokenUpdated($0)) }
operation.fetchDatabaseChangesCompletionBlock = { [weak self] token, moreComing, error in
if let token = token {
self?.changeToken = token
_ = self?.subscriber?.receive(.changeTokenUpdated(token))
}
if let error = error {
self?.handleError(error)
} else {
if moreComing, let operation = self?.configureOperation(changeToken: token) {
self?.queue.addOperation(operation)
} else {
self?.subscriber?.receive(completion: .finished)
}
}
}
self.operations.append(operation)
return operation
}
/// Handle CloudKit errors.
///
/// - Parameter error: The error to handle.
private func handleError(_ error: Error) {
if error.isCloudKitTokenExpiredError {
if error.isCloudKitTokenExpiredError {
let newOperation = configureOperation(changeToken: nil)
queue.addOperation(newOperation)
} else if let retryDelay = error.delayIfRetryPossible() {
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in
guard let newOperation = self?.configureOperation(changeToken: self?.changeToken) else { return }
self?.queue.addOperation(newOperation)
}
} else {
_ = subscriber?.receive(.unrecoverableError(error))
}
}
}
}
}
// MARK: -
extension FetchDatabaseChangesPublisher.Subscription: Cancellable {
func cancel() {
subscriber = nil
for operation in operations {
operation.cancel()
}
}
}
extension FetchDatabaseChangesPublisher.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
guard subscriber != nil else { return }
if demand > 0 && !didStart {
let operation = configureOperation(changeToken: changeToken)
queue.addOperation(operation)
didStart = true
}
}
}
import CloudKit
import Combine
/// A publisher that wraps a `CKFetchRecordZoneChangesOperation` and emits events as the operation completes.
///
/// The `FetchRecordZoneChangesPublisher` fetches changes from the given record zones. New and deleted records are
/// posted individually via the `.recordChanged` and `.recordDeleted` actions.
///
/// Errors are automatically retried if possible. Resetting the change token in case it expired is also automatically
/// handled. Because emitting errors fails a publisher all errors are posted as actions. There may still be running
/// operations, even if an error occurs.
///
/// The publisher emits a completion event once all running operations have finished.
struct FetchRecordZoneChangesPublisher: Publisher {
// MARK: Types
/// The events emitted by the publisher.
enum Action {
/// Posted when the record zone change token is updated.
case recordZoneChangeTokenUpdated(zoneID: CKRecordZone.ID, token: CKServerChangeToken?, clientToken: Data?)
/// Posted when an updated record is received from the server.
case recordChanged(CKRecord)
/// Posted when a record has been deleted on the server.
case recordDeleted(CKRecord.ID, CKRecord.RecordType)
/// Posted when the fetch is complete for a specific zone.
case zoneFetchComplete(CKRecordZone.ID, CKServerChangeToken?, Data?, Bool)
/// An error occured and changes for the associated zone will be fetched again.
case retryingZone(CKRecordZone.ID)
/// An unrecoverable error occured.
case unrecoverableError(CKRecordZone.ID?, Error)
}
// MARK: Properties
/// The CloudKit database to perform the operatoin on.
private let database: CKDatabase
/// A queue for the operation.
private let queue: OperationQueue
/// A dictionary of zones and their respective change tokens.
private let zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]
// MARK: Initialization
/// Creates a new `FetchRecordZoneChangesPublisher`.
///
/// - Parameter database: The cloud kit database to run the operation on.
/// - Parameter queue: The operation queue responsible for executing the operation.
/// - Parameter zoneTokens: A dictionary of zones and their respective change tokens.
///
/// - Returns: A new `FetchRecordZoneChangesPublisher`.
init(in database: CKDatabase, on queue: OperationQueue, zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]) {
self.database = database
self.queue = queue
self.zoneTokens = zoneTokens
}
// MARK: Publisher Implementation
func receive<S>(subscriber: S) where S: Subscriber, Never == S.Failure, Action == S.Input {
let subscription = Subscription(
subscriber: subscriber,
zoneTokens: zoneTokens,
in: database,
on: queue
)
subscriber.receive(subscription: subscription)
}
typealias Output = Action
typealias Failure = Never
}
// MARK: - Subscription
private extension FetchRecordZoneChangesPublisher {
/// The subscription wraps the actual operation execution and emits actions to its subscriber.
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
// MARK: Properties
/// The active subscriber receiving input, if any.
private var subscriber: S?
/// The operation queue to execute the operations on.
private let queue: OperationQueue
/// The cloud kit database to run the operations against.
private let database: CKDatabase
/// All in-flight operations.
private var operations: [CKFetchRecordZoneChangesOperation] = []
/// A list of zones and their respective tokens. May change as zones are retried.
private var zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]
/// `true` if all changes should be fetched. The consumer is responsible for kicking off more operations if
/// necessary to collect all data.
private let fetchAllChanges: Bool
/// Internal state, makes sure we only start one operation initially.
private var didStart: Bool = false
// MARK: Initialization
/// Creates a new `FetchRecordZoneChangesPublisher.Subscription`.
///
/// - Parameter subscriber: The subscriber to notify.
/// - Parameter zoneTkens: A list of zones and their respective tokens. May change as zones are retried.
/// - Parameter fetchAllChanges: `true` if all changes should be fetched. The consumer is responsible for kicking
/// off more operations if necessary to collect all data.
/// - Parameter database: The CloudKit database to perform the operation on.
/// - Parameter queue: An operation queue to run the operation.
///
/// - Returns: A new `FetchRecordZoneChangesPublisher.Subscriber`.
init(
subscriber: S,
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?],
fetchAllChanges: Bool = true,
in database: CKDatabase,
on queue: OperationQueue
) {
self.subscriber = subscriber
self.database = database
self.zoneTokens = zoneTokens
self.fetchAllChanges = fetchAllChanges
self.queue = queue
}
/// Configures an operation and sets up the callbacks to send events.
///
/// - Parameter zoneTokens: A list of zones and their respective tokens.
///
/// - Returns: A new `CKFetchRecordZoneChangesOperation`.
private func configureOperation(
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]
) -> CKFetchRecordZoneChangesOperation {
let configurations = Dictionary(uniqueKeysWithValues: zoneTokens.map { id, token in
(id, CKFetchRecordZoneChangesOperation.ZoneConfiguration(
previousServerChangeToken: token,
resultsLimit: nil,
desiredKeys: nil
))
})
let operation = CKFetchRecordZoneChangesOperation(
recordZoneIDs: Array(zoneTokens.keys),
configurationsByRecordZoneID: configurations
)
operation.fetchAllChanges = fetchAllChanges
operation.database = database
operation.qualityOfService = .userInitiated
operation.recordZoneChangeTokensUpdatedBlock = { [weak self] zoneID, token, clientToken in
self?.zoneTokens[zoneID] = token
_ = self?.subscriber?.receive(.recordZoneChangeTokenUpdated(
zoneID: zoneID,
token: token,
clientToken: clientToken
))
}
operation.recordChangedBlock = { [weak self] record in
_ = self?.subscriber?.receive(.recordChanged(record))
}
operation.recordWithIDWasDeletedBlock = { [weak self] id, type in
_ = self?.subscriber?.receive(.recordDeleted(id, type))
}
operation.recordZoneFetchCompletionBlock = { [weak self] zoneID, changeToken, clientToken, moreComing, error in
if let error = error {
self?.handleError(error, for: zoneID)
} else {
_ = self?.subscriber?.receive(.zoneFetchComplete(zoneID, changeToken, clientToken, moreComing))
}
}
operation.fetchRecordZoneChangesCompletionBlock = { [weak self] error in
guard let strongSelf = self else { return }
_ = strongSelf.operations.firstIndex(of: operation).map { strongSelf.operations.remove(at: $0) }
if let error = error {
strongSelf.handleError(error, for: nil)
} else {
if strongSelf.operations.isEmpty {
self?.subscriber?.receive(completion: .finished)
}
}
}
self.operations.append(operation)
return operation
}
/// Handle CloudKit errors.
///
/// - Parameter error: The error to handle.
/// - Parameter zoneID: The ID of the zone in which the error occured.
private func handleError(_ error: Error, for zoneID: CKRecordZone.ID?) {
if error.isCloudKitTokenExpiredError, let zoneID = zoneID {
_ = subscriber?.receive(.recordZoneChangeTokenUpdated(zoneID: zoneID, token: nil, clientToken: nil))
_ = subscriber?.receive(.retryingZone(zoneID))
let newOperation = configureOperation(zoneTokens: [zoneID: nil])
queue.addOperation(newOperation)
} else if let retryDelay = error.delayIfRetryPossible(), let zoneID = zoneID {
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in
let token = self?.zoneTokens[zoneID] ?? nil
guard let newOperation = self?.configureOperation(zoneTokens: [zoneID: token]) else { return }
self?.queue.addOperation(newOperation)
}
} else {
_ = subscriber?.receive(.unrecoverableError(zoneID, error))
}
}
}
}
// MARK: -
extension FetchRecordZoneChangesPublisher.Subscription: Cancellable {
func cancel() {
subscriber = nil
for operation in operations {
operation.cancel()
}
}
}
extension FetchRecordZoneChangesPublisher.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
guard subscriber != nil else { return }
if demand > 0 && !didStart {
let operation = configureOperation(zoneTokens: zoneTokens)
queue.addOperation(operation)
didStart = true
}
}
}
import CloudKit
import Combine
/// The `ModifyRecordsPublisher` takes an array of records to save and or delete and syncs those with iCloud.
struct ModifyRecordsPublisher: Publisher {
// MARK: Types
/// The events emitted by the publisher.
enum Action {
/// Upload progress was updated for a record.
case didUpdateProgress(CKRecord, Double)
/// A conflict failed to resolve.
case failedToResolveConflict
/// A record failed to save.
case failedToSaveRecord(CKRecord, Error)
/// A record was saved.
case recordSaved(CKRecord)
/// Records where deleted.
case recordsDeleted([CKRecord.ID])
/// Sync operation completed.
case syncCompleted
/// An unknown error occured.
case unknownError(Error)
/// Upload limit exceeded. Need to batch items.
case limitExceeded
}
// MARK: Properties
/// The CloudKit database to perform the operatoin on.
private var database: CKDatabase
/// An array of records to save.
private var recordsToSave: [CKRecord]?
/// An array of record IDs to delete.
private var recordIDsToDelete: [CKRecord.ID]?
/// A queue for the operation.
private let queue: OperationQueue
/// A closure that can resolve a conflict between two records.
private let resolver: (_ client: CKRecord, _ server: CKRecord) -> CKRecord?
// MARK: Initalization
/// Creates a new `ModifyRecordsPublisher`.
///
/// - Parameter recordsToSave: An array of records to save.
/// - Parameter recordIDsToDelete: An array of record IDs to delete.
/// - Parameter database: The CloudKit database to perform the operatoin on.
/// - Parameter queue: A queue for the operation.
/// - Parameter resolver: A closure that can resolve a conflict between two records.
/// - Parameter client: The client's version of the record.
/// - Parameter server: The server's version of the record.
///
/// - Returns: A new `ModifyRecordsPublisher`
init(
recordsToSave: [CKRecord]?,
recordIDsToDelete: [CKRecord.ID]?,
database: CKDatabase,
queue: OperationQueue,
resolver: @escaping (_ client: CKRecord, _ server: CKRecord) -> CKRecord?
) {
self.database = database
self.queue = queue
self.recordsToSave = recordsToSave
self.recordIDsToDelete = recordIDsToDelete
self.resolver = resolver
}
// MARK: Publisher
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = Subscription(
subscriber: subscriber,
recordsToSave: recordsToSave,
recordIDsToDelete: recordIDsToDelete,
database: database,
queue: queue,
resolver: resolver
)
subscriber.receive(subscription: subscription)
}
typealias Output = Action
typealias Failure = Never
}
// MARK: - Subscription
extension ModifyRecordsPublisher {
/// The subscription that wraps the actual operation execution and emits actions to its subscriber.
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
/// The subscriber to notify of events.
private var subscriber: S?
/// The CloudKit database to perform the operatoin on.
private var database: CKDatabase
/// An array of records to save.
private var recordsToSave: [CKRecord]?
/// An array of record IDs to delete.
private var recordIDsToDelete: [CKRecord.ID]?
/// A queue for the operation.
private let queue: OperationQueue
/// A closure that can resolve a conflict between two records.
private let resolver: (_ client: CKRecord, _ server: CKRecord) -> CKRecord?
/// Any in-flight operations.
private var operations: [CKModifyRecordsOperation] = []
// MARK: Initialization
/// Creates a new `ModifyRecordsPublisher.Subscription`.
///
/// - Parameter subscriber: The subscriber to notify.
/// - Parameter recordsToSave: An array of records to save.
/// - Parameter recordIDsToDelete: An array of record IDs to delete.
/// - Parameter database: The CloudKit database to perform the operatoin on.
/// - Parameter queue: A queue for the operation.
/// - Parameter resolver: A closure that can resolve a conflict between two records.
/// - Parameter client: The client's version of the record.
/// - Parameter server: The server's version of the record.
///
/// - Returns: A new `ModifyRecordsPublisher.Subscription`.
init(
subscriber: S,
recordsToSave: [CKRecord]?,
recordIDsToDelete: [CKRecord.ID]?,
database: CKDatabase,
queue: OperationQueue,
resolver: @escaping (_ client: CKRecord, _ server: CKRecord) -> CKRecord?
) {
self.subscriber = subscriber
self.database = database
self.queue = queue
self.recordsToSave = recordsToSave
self.recordIDsToDelete = recordIDsToDelete
self.resolver = resolver
}
// MARK: Operation
/// Creates and configures an operation.
///
/// Might be called multiple times to retry resolved conflicts or retry other errors.
///
/// - Parameter recordsToSave: An array of records to save.
/// - Parameter recordIDsToDelete: An array of record IDs to delete.
///
/// - Returns: A `CKModifyRecordsOperation`.
private func configureOperation(
recordsToSave: [CKRecord]?,
recordIDsToDelete: [CKRecord.ID]?
) -> CKModifyRecordsOperation {
let operation = CKModifyRecordsOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete)
operation.savePolicy = .changedKeys
operation.qualityOfService = .userInitiated
operation.database = database
operation.perRecordProgressBlock = { [weak self] record, progress in
_ = self?.subscriber?.receive(.didUpdateProgress(record, progress))
}
operation.perRecordCompletionBlock = { [weak self] record, error in
guard let error = error else {
_ = self?.subscriber?.receive(.recordSaved(record))
return
}
self?.handleRecordError(record: record, error: error)
}
operation.modifyRecordsCompletionBlock = { [weak self] _, deleted, error in
guard let strongSelf = self else { return }
_ = strongSelf.operations.firstIndex(of: operation).map { strongSelf.operations.remove(at: $0) }
if let error = error {
self?.handleUploadError(error: error, recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete)
} else if let records = strongSelf.recordsToSave, !records.isEmpty {
let newOperation = self?.configureOperation(recordsToSave: records, recordIDsToDelete: nil)
strongSelf.recordsToSave = nil
newOperation.map {
self?.queue.addOperation($0)
self?.operations.append($0)
}
} else {
_ = deleted.map { self?.subscriber?.receive(.recordsDeleted($0)) }
if strongSelf.operations.isEmpty {
_ = self?.subscriber?.receive(.syncCompleted)
}
}
}
return operation
}
/// Handle a single record error.
///
/// - Parameter record: The record for which the error occured.
/// - Parameter error: The error that occured.
private func handleRecordError(record: CKRecord, error: Error) {
// We only care about conflict errors here. Other errors like failed uploads are handled by the
// completion block.
guard error.isCloudKitConflictError else {
_ = subscriber?.receive(.failedToSaveRecord(record, error))
return
}
// Attempt to resolve conflict.
guard let resolved = error.resolveConflict(with: resolver) else {
_ = subscriber?.receive(.failedToResolveConflict)
return
}
// Queue up the resolved record to be saved.
recordsToSave?.append(resolved)
}
/// Handle errors.
///
/// - Parameter error: The error to handle.
/// - Parameter recordsToSave: An array of the records that failed to save.
/// - Parameter recordsToDelete: An array of the records that could not be deleted.
private func handleUploadError(error: Error, recordsToSave: [CKRecord]?, recordIDsToDelete: [CKRecord.ID]?) {
guard let ckError = error as? CKError else {
_ = subscriber?.receive(.unknownError(error))
subscriber?.receive(completion: .finished)
return
}
if ckError.code == .limitExceeded {
_ = subscriber?.receive(.limitExceeded)
} else if let retryDelay = ckError.retryAfterSeconds {
let newOperation = configureOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete)
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in
self?.queue.addOperation(newOperation)
}
}
}
}
}
// MARK: -
extension ModifyRecordsPublisher.Subscription: Cancellable {
func cancel() {
subscriber = nil
for operation in operations {
operation.cancel()
}
}
}
extension ModifyRecordsPublisher.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
guard subscriber != nil, operations.count == 0, demand > 0 else { return }
let operation = configureOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete)
recordsToSave = nil
recordIDsToDelete = nil
queue.addOperation(operation)
}
}
import CloudKit
import Combine
/// A publisher that wraps a `CKModifyRecordZonesOperation` and emits events as the operation completes.
public struct ModifyRecordZonesPublisher: Publisher {
// MARK: Types
/// The events emitted by the publisher.
public enum Action: Equatable {
/// The operation completed. Associated values contains the zones that were created and deleted.
case operationCompleted(created: [CKRecordZone]?, deleted: [CKRecordZone.ID]?)
}
// MARK: Properties
/// The zones to create.
private let zones: [CKRecordZone]
/// The IDs of the zones to delete.
private let delete: [CKRecordZone.ID]
/// The database to create the zone in
private let database: CKDatabase
/// A queue for the operation. If `nil` the database queue will be used.
private let queue: OperationQueue?
// MARK: Initialization
/// Creates a new `ModifyRecordsPublisher`.
///
/// - Parameter zones: The record zones to create.
/// - Parameter delete: The record zones to delete.
/// - Parameter database: The database to create the zone in.
/// - Parameter queue: An optional operation queue to run the operations.
///
/// - Returns: A new `ModifyRecordZonesPublisher`.
public init(
create: [CKRecordZone],
delete: [CKRecordZone.ID] = [],
in database: CKDatabase,
on queue: OperationQueue? = nil
) {
self.zones = create
self.delete = delete
self.database = database
self.queue = queue
}
// MARK: Publisher Implementation
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = Subscription(
subscriber: subscriber,
zones: zones,
toDelete: delete,
in: database,
on: queue
)
subscriber.receive(subscription: subscription)
}
public typealias Output = Action
public typealias Failure = CloudKitClientError
}
// MARK: - Subscription
private extension ModifyRecordZonesPublisher {
/// The subscription that wraps the actual operation execution and emits actions to its subscriber.
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
// MARK: Properties
/// The subscriber to notify of events.
private var subscriber: S?
/// The database to create the zone in
private let database: CKDatabase
/// An operation queue to run the operation.
private let queue: OperationQueue?
/// The operation when in flight.
private var operation: CKModifyRecordZonesOperation?
/// The zones to create.
private let zones: [CKRecordZone]
/// The zone IDs to delete.
private let delete: [CKRecordZone.ID]
// MARK: Initialization
/// Creates a new `ModifyRecordZonesPublisher.Subscription`.
///
/// - Parameter subscriber: The subscriber to notify.
/// - Parameter zones: The zones to create.
/// - Parameter delete: The zone IDs to delete.
/// - Parameter database: The database to create the zone in.
/// - Parameter queue: An operation queue to run the operation.
///
/// - Returns: A new `RecordSubscriptionPublisher.Subscriber`.
init(
subscriber: S,
zones: [CKRecordZone],
toDelete delete: [CKRecordZone.ID],
in database: CKDatabase,
on queue: OperationQueue?
) {
self.subscriber = subscriber
self.database = database
self.queue = queue
self.zones = zones
self.delete = delete
}
// MARK: Operation
/// Configures the operation and sets up the callbacks to send events.
private func configureOperation() {
let operation = CKModifyRecordZonesOperation(recordZonesToSave: zones, recordZoneIDsToDelete: delete)
operation.database = database
operation.qualityOfService = .utility
operation.modifyRecordZonesCompletionBlock = { [weak self] zones, deleted, error in
if let error = error as? CKError {
self?.handleError(error)
} else if let error = error.map({ $0 as NSError }) {
self?.subscriber?.receive(completion: .failure(.unknownError(error)))
} else {
_ = self?.subscriber?.receive(.operationCompleted(created: zones, deleted: deleted))
self?.subscriber?.receive(completion: .finished)
}
}
self.operation = operation
if let queue = queue {
queue.addOperation(operation)
} else {
database.add(operation)
}
}
/// Handle CloudKit errors.
///
/// - Parameter error: The error to handle.
private func handleError(_ error: CKError) {
if let retryDelay = error.delayIfRetryPossible(), let queue = queue {
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in
guard let strongSelf = self else { return }
strongSelf.configureOperation()
}
} else {
subscriber?.receive(completion: .failure(.cloudKitError(error)))
}
}
}
}
// MARK: -
extension ModifyRecordZonesPublisher.Subscription: Cancellable {
func cancel() {
subscriber = nil
operation?.cancel()
}
}
extension ModifyRecordZonesPublisher.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
guard subscriber != nil, operation == nil, demand > 0 else { return }
configureOperation()
}
}
import CloudKit
import Combine
/// A `RecordSubscriptionPublisher` wraps a `CKModifySubscriptionsOperation` and emits actions whenever the
/// operation returns a callback.
public struct RecordSubscriptionPublisher: Publisher {
// MARK: Types
/// The events emitted by the publisher.
public enum Action: Equatable {
/// The subscription was saved.
case savedSubscription(CKRecordZone.ID?, CKSubscription.ID)
}
// MARK: Properties
/// The CloudKit database to perform the operation on.
private let database: CKDatabase
/// An operation queue to run the operation.
private let queue: OperationQueue
/// The id of the zone to create the subscription for.
private let zoneID: CKRecordZone.ID?
/// The id of the subscription to create.
private let subscriptionID: CKSubscription.ID
// MARK: Initialization
/// Creates a new `RecordSubscriptionPublisher`.
///
/// - Parameter database: The CloudKit database to perform the operation on.
/// - Parameter queue: An operation queue to run the operation.
/// - Parameter zoneID: The id of the zone to create the subscription for.
/// - Parameter subscriptionID: The id of the subscription to create.
///
/// - Returns: A new `RecordSubscriptionPublisher`.
public init(
in database: CKDatabase,
on queue: OperationQueue,
zoneID: CKRecordZone.ID? = nil,
subscriptionID: CKSubscription.ID
) {
self.database = database
self.queue = queue
self.zoneID = zoneID
self.subscriptionID = subscriptionID
}
// MARK: Publisher Implementation
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = Subscription(
subscriber: subscriber,
database: database,
queue: queue,
zoneID: zoneID,
subscriptionID: subscriptionID
)
subscriber.receive(subscription: subscription)
}
public typealias Output = Action
public typealias Failure = CloudKitClientError
}
// MARK: - Subscription
private extension RecordSubscriptionPublisher {
/// The subscription that wraps the actual operation execution and emits actions to its subscriber.
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
// MARK: Properties
/// The subscriber to notify of events.
private var subscriber: S?
/// The operation when in flight.
private var operation: CKModifySubscriptionsOperation?
/// The CloudKit database to perform the operation on.
private let database: CKDatabase
/// An operation queue to run the operation.
private let queue: OperationQueue
/// The id of the zone to create the subscription for.
private let zoneID: CKRecordZone.ID?
/// The id of the subscription to create.
private let subscriptionID: CKSubscription.ID
// MARK: Initialization
/// Creates a new `RecordSubscriptionPublisher.Subscription`.
///
/// - Parameter subscriber: The subscriber to notify.
/// - Parameter database: The CloudKit database to perform the operation on.
/// - Parameter queue: An operation queue to run the operation.
/// - Parameter zoneID: The id of the zone to create the subscription for.
/// - Parameter subscriptionID: The id of the subscription to create.
///
/// - Returns: A new `RecordSubscriptionPublisher`.
init(
subscriber: S,
database: CKDatabase,
queue: OperationQueue,
zoneID: CKRecordZone.ID?,
subscriptionID: CKSubscription.ID
) {
self.subscriber = subscriber
self.database = database
self.queue = queue
self.zoneID = zoneID
self.subscriptionID = subscriptionID
}
// MARK: Operations
/// Creates and configures a `CKModifySubscriptionsOperation` to communciate with CloudKit.
///
/// - Parameter zoneID: The id of the zone to create the subscription for.
/// - Parameter subscriptionID: The id of the subscription to create.
private func configureOperation(zoneID: CKRecordZone.ID?, subscriptionID: CKSubscription.ID) {
let subscription: CKSubscription
if let zoneID = zoneID {
subscription = CKRecordZoneSubscription(zoneID: zoneID, subscriptionID: subscriptionID)
} else {
subscription = CKDatabaseSubscription(subscriptionID: subscriptionID)
}
let notificationInfo = CKSubscription.NotificationInfo()
notificationInfo.shouldSendContentAvailable = true
notificationInfo.alertBody = "hej"
subscription.notificationInfo = notificationInfo
let operation = CKModifySubscriptionsOperation(subscriptionsToSave: [subscription], subscriptionIDsToDelete: nil)
operation.database = database
operation.qualityOfService = .userInitiated
operation.modifySubscriptionsCompletionBlock = { [weak self] _, _, error in
if let error = error as? CKError {
self?.handleError(error: error)
} else if let error = error.map({ $0 as NSError }) {
self?.subscriber?.receive(completion: .failure(.unknownError(error)))
} else {
_ = self?.subscriber?.receive(.savedSubscription(zoneID, subscriptionID))
self?.subscriber?.receive(completion: .finished)
}
}
self.operation = operation
queue.addOperation(operation)
}
/// Handle CloudKit errors.
///
/// - Parameter error: The error to handle.
private func handleError(error: CKError) {
if let retryDelay = error.delayIfRetryPossible() {
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in
guard let strongSelf = self else { return }
strongSelf.configureOperation(zoneID: strongSelf.zoneID, subscriptionID: strongSelf.subscriptionID)
}
} else {
subscriber?.receive(completion: .failure(.cloudKitError(error)))
}
}
}
}
// MARK: -
extension RecordSubscriptionPublisher.Subscription: Cancellable {
func cancel() {
subscriber = nil
operation?.cancel()
}
}
extension RecordSubscriptionPublisher.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
guard subscriber != nil, operation == nil, demand > 0 else { return }
configureOperation(zoneID: zoneID, subscriptionID: subscriptionID)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment