Skip to content

Instantly share code, notes, and snippets.

@simme
Created December 6, 2022 08:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simme/3d790dbb89e5bbcf74d7603f509ecda5 to your computer and use it in GitHub Desktop.
Save simme/3d790dbb89e5bbcf74d7603f509ecda5 to your computer and use it in GitHub Desktop.
import CloudKit
import Combine
/// A publisher that wraps a `CKFetchRecordZoneChangesOperation` and emits events as the operation completes.
///
/// The `FetchRecordZoneChangesPublisher` fetches changes from the given record zones. New and deleted records are
/// posted individually via the `.recordChanged` and `.recordDeleted` actions.
///
/// Errors are automatically retried if possible. Resetting the change token in case it expired is also automatically
/// handled. Because emitting errors fails a publisher all errors are posted as actions. There may still be running
/// operations, even if an error occurs.
///
/// The publisher emits a completion event once all running operations have finished.
struct FetchRecordZoneChangesPublisher: Publisher {
// MARK: Types
/// The events emitted by the publisher.
enum Action {
/// Posted when the record zone change token is updated.
case recordZoneChangeTokenUpdated(zoneID: CKRecordZone.ID, token: CKServerChangeToken?, clientToken: Data?)
/// Posted when an updated record is received from the server.
case recordChanged(CKRecord)
/// Posted when a record has been deleted on the server.
case recordDeleted(CKRecord.ID, CKRecord.RecordType)
/// Posted when the fetch is complete for a specific zone.
case zoneFetchComplete(CKRecordZone.ID, CKServerChangeToken?, Data?, Bool)
/// An error occured and changes for the associated zone will be fetched again.
case retryingZone(CKRecordZone.ID)
/// An unrecoverable error occured.
case unrecoverableError(CKRecordZone.ID?, Error)
}
// MARK: Properties
/// The CloudKit database to perform the operatoin on.
private let database: CKDatabase
/// A queue for the operation.
private let queue: OperationQueue
/// A dictionary of zones and their respective change tokens.
private let zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]
// MARK: Initialization
/// Creates a new `FetchRecordZoneChangesPublisher`.
///
/// - Parameter database: The cloud kit database to run the operation on.
/// - Parameter queue: The operation queue responsible for executing the operation.
/// - Parameter zoneTokens: A dictionary of zones and their respective change tokens.
///
/// - Returns: A new `FetchRecordZoneChangesPublisher`.
init(in database: CKDatabase, on queue: OperationQueue, zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]) {
self.database = database
self.queue = queue
self.zoneTokens = zoneTokens
}
// MARK: Publisher Implementation
func receive<S>(subscriber: S) where S: Subscriber, Never == S.Failure, Action == S.Input {
let subscription = Subscription(
subscriber: subscriber,
zoneTokens: zoneTokens,
in: database,
on: queue
)
subscriber.receive(subscription: subscription)
}
typealias Output = Action
typealias Failure = Never
}
// MARK: - Subscription
private extension FetchRecordZoneChangesPublisher {
/// The subscription wraps the actual operation execution and emits actions to its subscriber.
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
// MARK: Properties
/// The active subscriber receiving input, if any.
private var subscriber: S?
/// The operation queue to execute the operations on.
private let queue: OperationQueue
/// The cloud kit database to run the operations against.
private let database: CKDatabase
/// All in-flight operations.
private var operations: [CKFetchRecordZoneChangesOperation] = []
/// A list of zones and their respective tokens. May change as zones are retried.
private var zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]
/// `true` if all changes should be fetched. The consumer is responsible for kicking off more operations if
/// necessary to collect all data.
private let fetchAllChanges: Bool
/// Internal state, makes sure we only start one operation initially.
private var didStart: Bool = false
// MARK: Initialization
/// Creates a new `FetchRecordZoneChangesPublisher.Subscription`.
///
/// - Parameter subscriber: The subscriber to notify.
/// - Parameter zoneTkens: A list of zones and their respective tokens. May change as zones are retried.
/// - Parameter fetchAllChanges: `true` if all changes should be fetched. The consumer is responsible for kicking
/// off more operations if necessary to collect all data.
/// - Parameter database: The CloudKit database to perform the operation on.
/// - Parameter queue: An operation queue to run the operation.
///
/// - Returns: A new `FetchRecordZoneChangesPublisher.Subscriber`.
init(
subscriber: S,
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?],
fetchAllChanges: Bool = true,
in database: CKDatabase,
on queue: OperationQueue
) {
self.subscriber = subscriber
self.database = database
self.zoneTokens = zoneTokens
self.fetchAllChanges = fetchAllChanges
self.queue = queue
}
/// Configures an operation and sets up the callbacks to send events.
///
/// - Parameter zoneTokens: A list of zones and their respective tokens.
///
/// - Returns: A new `CKFetchRecordZoneChangesOperation`.
private func configureOperation(
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]
) -> CKFetchRecordZoneChangesOperation {
let configurations = Dictionary(uniqueKeysWithValues: zoneTokens.map { id, token in
(id, CKFetchRecordZoneChangesOperation.ZoneConfiguration(
previousServerChangeToken: token,
resultsLimit: nil,
desiredKeys: nil
))
})
let operation = CKFetchRecordZoneChangesOperation(
recordZoneIDs: Array(zoneTokens.keys),
configurationsByRecordZoneID: configurations
)
operation.fetchAllChanges = fetchAllChanges
operation.database = database
operation.qualityOfService = .userInitiated
operation.recordZoneChangeTokensUpdatedBlock = { [weak self] zoneID, token, clientToken in
self?.zoneTokens[zoneID] = token
_ = self?.subscriber?.receive(.recordZoneChangeTokenUpdated(
zoneID: zoneID,
token: token,
clientToken: clientToken
))
}
operation.recordChangedBlock = { [weak self] record in
_ = self?.subscriber?.receive(.recordChanged(record))
}
operation.recordWithIDWasDeletedBlock = { [weak self] id, type in
_ = self?.subscriber?.receive(.recordDeleted(id, type))
}
operation.recordZoneFetchCompletionBlock = { [weak self] zoneID, changeToken, clientToken, moreComing, error in
if let error = error {
self?.handleError(error, for: zoneID)
} else {
_ = self?.subscriber?.receive(.zoneFetchComplete(zoneID, changeToken, clientToken, moreComing))
}
}
operation.fetchRecordZoneChangesCompletionBlock = { [weak self] error in
guard let strongSelf = self else { return }
_ = strongSelf.operations.firstIndex(of: operation).map { strongSelf.operations.remove(at: $0) }
if let error = error {
strongSelf.handleError(error, for: nil)
} else {
if strongSelf.operations.isEmpty {
self?.subscriber?.receive(completion: .finished)
}
}
}
self.operations.append(operation)
return operation
}
/// Handle CloudKit errors.
///
/// - Parameter error: The error to handle.
/// - Parameter zoneID: The ID of the zone in which the error occured.
private func handleError(_ error: Error, for zoneID: CKRecordZone.ID?) {
if error.isCloudKitTokenExpiredError, let zoneID = zoneID {
_ = subscriber?.receive(.recordZoneChangeTokenUpdated(zoneID: zoneID, token: nil, clientToken: nil))
_ = subscriber?.receive(.retryingZone(zoneID))
let newOperation = configureOperation(zoneTokens: [zoneID: nil])
queue.addOperation(newOperation)
} else if let retryDelay = error.delayIfRetryPossible(), let zoneID = zoneID {
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in
let token = self?.zoneTokens[zoneID] ?? nil
guard let newOperation = self?.configureOperation(zoneTokens: [zoneID: token]) else { return }
self?.queue.addOperation(newOperation)
}
} else {
_ = subscriber?.receive(.unrecoverableError(zoneID, error))
}
}
}
}
// MARK: -
extension FetchRecordZoneChangesPublisher.Subscription: Cancellable {
func cancel() {
subscriber = nil
for operation in operations {
operation.cancel()
}
}
}
extension FetchRecordZoneChangesPublisher.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
guard subscriber != nil else { return }
if demand > 0 && !didStart {
let operation = configureOperation(zoneTokens: zoneTokens)
queue.addOperation(operation)
didStart = true
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment