Created
October 22, 2019 21:55
-
-
Save hyouuu/d9efc663d1e7a153b029c31be20149e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// CKSyncHandler+SyncHandlerDelegate.swift | |
// Pendo | |
// | |
// Created by Shawn Gong on 5/29/19. | |
// Copyright © 2019 Lychee Isle. All rights reserved. | |
// | |
import CloudKit | |
extension CKSyncHandler: SyncHandlerDelegate { | |
// MARK: Sync Notification | |
func tryProcessSyncNotif(_ userInfo: [AnyHashable : Any]) -> Bool { | |
// Try parsing to CKQueryNotification | |
guard let ckQueryNotif = CKQueryNotification(fromRemoteNotificationDictionary: userInfo), | |
ckQueryNotif.notificationType == .query else { return false } | |
spr("CKQueryNotif:\(ckQueryNotif)") | |
guard let recordID = ckQueryNotif.recordID else { | |
// If no exact info, initiate a full sync schedule | |
Syncer.one.schedule("Received CKQueryNotif") | |
return true | |
} | |
let recordInfo = recordID.recordName | |
guard recordInfo != metaRecordName else { | |
er("recordInfo == metaRecordName: \(metaRecordName)", .cloudKit) | |
return true | |
} | |
if recordInfo == userSettingsRecordName { | |
syncerDelegate?.shouldSyncUserSettings(from: assocProvider, src: "Received CKQueryNotif for UserSettings") | |
return true | |
} | |
syncerDelegate?.pushSyncWillStart() | |
switch ckQueryNotif.queryNotificationReason { | |
case .recordCreated, .recordUpdated: | |
readRecord(recordInfo) { [weak self] obj in | |
defer { | |
self?.syncerDelegate?.pushSyncDidStop() | |
} | |
guard let obj = obj else { | |
er("Expect obj for \(recordInfo)", .cloudKit) | |
return | |
} | |
self?.recordUpserted(recordInfo, obj: obj) | |
} | |
case .recordDeleted: | |
recordDeleted(recordInfo) | |
syncerDelegate?.pushSyncDidStop() | |
@unknown default: | |
er("Unknown case", .cloudKit) | |
syncerDelegate?.pushSyncDidStop() | |
} | |
return true | |
} | |
// MARK: User Settings | |
/// (userSettings, update) | |
func readUserSettings(completion: @escaping ([String: String], Int) -> ()) { | |
readUserSettings(retryDelay: 0.5, completion: completion) | |
} | |
/// (userSettings, update) | |
func readUserSettings(retryDelay: TimeInterval = 0.5, completion: @escaping ([String: String], Int) -> ()) { | |
db.fetch(withRecordID: CKRecord.ID(recordName: userSettingsRecordName)) { [weak self] record, err in | |
guard let s = self else { completion([:], 0); return } | |
// Unless "Not Found", deal with the err and possibly resetMeta | |
if let err = err, !s.isRecordNotFoundErr(err) { | |
let shouldRetry = s.handleErr(err, src: "fetchMeta") | |
if shouldRetry { | |
guard retryDelay <= s.maxRetryDelay else { completion([:], 0); return } | |
spr("readUserSettings retryDelay:\(retryDelay)") | |
delayToBg(retryDelay) { | |
s.readUserSettings(retryDelay: nextRetryDelay(retryDelay), completion: completion) | |
} | |
return | |
} | |
completion([:], 0) | |
return | |
} | |
guard let record = record, let jsonNSData = record[s.jsonDataField] as? NSData else { | |
completion([:], 0) | |
return | |
} | |
var userSettings = [String: String]() | |
let jsonData = Data(referencing: jsonNSData) | |
let decoder = JSONDecoder() | |
do { | |
userSettings = try decoder.decode([String: String].self, from: jsonData) | |
} catch { | |
er("CK readUserSettings can't parse jsonData", .cloudKit) | |
} | |
var update = 0 | |
if let localUpdate = userSettings[UserSetting.localUpdatedAt.rawValue], let val = Double(localUpdate) { | |
update = val.i | |
pr("CK readUserSettings update:\(update) from localUpdate") | |
} else if let val = record.modificationDate?.i { | |
update = val | |
pr("CK readUserSettings update:\(update) from modificationDate") | |
} | |
completion(userSettings, update) | |
} | |
} | |
func upsertUserSettings(_ userSettings: [String: String]) { | |
upsertUserSettings(userSettings, retryDelay: 0.5) | |
} | |
func upsertUserSettings(_ userSettings: [String: String], retryDelay: TimeInterval = 0.5) { | |
spr("CK upsertUserSettings") | |
var jsonData = Data() | |
let encoder = JSONEncoder() | |
do { | |
jsonData = try encoder.encode(userSettings) | |
} catch { | |
er("CK upsertUserSettings can't encode userSettings", .cloudKit) | |
} | |
#if DEBUG | |
guard let jsonString = String(data: jsonData, encoding: .utf8) else { er(); return } | |
pr("CK upsertUserSettings json:\(jsonString)") | |
#endif | |
let recordID = CKRecord.ID(recordName: userSettingsRecordName) | |
db.fetch(withRecordID: recordID) { [weak self] fetchedRecord, err in | |
guard let s = self else { return } | |
// Create if none existing | |
let record = fetchedRecord ?? CKRecord(recordType: RecordType.userSettings.rawValue, recordID: recordID) | |
// Might want to delete existing keys in the future | |
record[s.jsonDataField] = jsonData as NSData | |
s.db.save(record) { [weak self] savedRecord, err in | |
guard let s = self else { return } | |
guard let err = err else { | |
spr("CK upsertUserSettings saved record completed") | |
return | |
} | |
let shouldRetry = s.handleErr(err, src: "updateCKMeta saveRecord") | |
if shouldRetry { | |
guard retryDelay <= s.maxRetryDelay else { return } | |
spr("CK upsertUserSettings retryDelay:\(retryDelay)") | |
delayToBg(retryDelay) { | |
s.upsertUserSettings(userSettings, retryDelay: nextRetryDelay(retryDelay)) | |
} | |
} | |
} | |
} | |
} | |
// MARK: Records | |
// recordNames, deletedRecordInfos | |
func listRecordNames(_ completion: @escaping ([String], OrderedSet<String>) -> ()) { | |
spr("CK listRecordNames") | |
// Need to refetch every time because if we use cached metaRecordNames then there's no way to know what's on cloud. The meta file shouldn't be too big so shouldn't be a problem for now | |
fetchMeta("listRecordNames") { [weak self] finished in | |
guard let s = self else { completion([], []); return } | |
var recordNames = [String]() | |
var deletedRecordInfos = OrderedSet<String>() | |
read { | |
for (recordInfo, update) in s.metaRecordInfoToUpdates { | |
recordNames.append(Record.makeName(recordInfo: recordInfo, update: update)) | |
} | |
deletedRecordInfos = s.deletedRecordInfos | |
spr("CK listRecordNames recordNames count:\(recordNames.count) deleted count:\(deletedRecordInfos.count)") | |
} | |
completion(recordNames, deletedRecordInfos) | |
} | |
} | |
/// [recordInfo : obj] | |
func readRecords(_ recordInfos: [String], completion: @escaping ([String: BaseR]?) -> ()) { | |
readRecords(recordInfos, retryDelay: 0.5, completion: completion) | |
} | |
func readRecord(_ recordInfo: String, completion: @escaping (BaseR?) -> ()) { | |
readRecords([recordInfo]) { recordInfoToObjs in | |
completion(recordInfoToObjs?[recordInfo]) | |
} | |
} | |
// [recordInfo : obj] | |
func readRecords(_ recordInfos: [String], | |
retryDelay: TimeInterval = 0.5, | |
completion: @escaping ([String: BaseR]?) -> ()) | |
{ | |
spr("CK readRecords:\(recordInfos)") | |
guard isSyncStateValid() else { | |
completion(nil) | |
return | |
} | |
db.add(fetchOp(recordInfos) { [weak self] ckRecordIdToRecords, err in | |
guard let s = self else { return } | |
if let err = err { | |
let shouldRetry = s.handleErr(err, src: "CK readRecords") | |
if shouldRetry { | |
guard retryDelay <= s.maxRetryDelay else { | |
er("CK readRecords passed maxRetryDelay - completion with nil", .cloudKit) | |
completion(nil) | |
return | |
} | |
spr("readRecords retryDelay:\(retryDelay)") | |
delayToBg(retryDelay.t) { | |
s.readRecords(recordInfos, | |
retryDelay: nextRetryDelay(retryDelay), | |
completion: completion) | |
} | |
} else { | |
spr("!!! CK readRecords err:\(err.desc)") | |
s.syncerDelegate?.shouldRestartAndResetMetaUponFinish(for: s.assocProvider) | |
// If already reading single record, then we're done | |
guard recordInfos.count > 1 else { | |
completion(nil) | |
return | |
} | |
// Try to fetch individual ones | |
var didGroupFinish = false | |
let dispatchGroup = DispatchGroup() | |
var ckRecordInfoToObjs = [String: BaseR]() | |
for recordInfo in recordInfos { | |
dispatchGroup.enter() | |
s.readRecords([recordInfo]) { ckRecordInfoToObj in | |
if let ckRecordInfoToObj = ckRecordInfoToObj, ckRecordInfoToObj.hasVal { | |
toWrite { | |
ckRecordInfoToObjs.merge(ckRecordInfoToObj) { (cur, new) in new } | |
} | |
} else { | |
spr("!!! CK err can't read record:\(recordInfo)") | |
} | |
if didGroupFinish { | |
er("CK readRecords partial failure read individual didGroupFinish already", .dispatchGroup) | |
} else { | |
delayToBg { | |
dispatchGroup.leave() | |
} | |
} | |
} | |
} | |
dispatchGroup.notify(queue: bgQueue) { | |
didGroupFinish = true | |
spr("CK readRecords partial failure dispatchGroup.notify") | |
var r = [String: BaseR]() | |
read { | |
r = ckRecordInfoToObjs | |
} | |
completion(r) | |
} | |
} | |
return | |
} | |
guard let ckRecordIdToRecords = ckRecordIdToRecords else { | |
completion(nil) | |
return | |
} | |
var recordInfoToObjs = [String: BaseR]() | |
for ckRecord in ckRecordIdToRecords.values { | |
guard let obj = s.makeBaseObj(from: ckRecord) else { | |
er("Expect obj", .cloudKit) | |
continue | |
} | |
recordInfoToObjs[ckRecord.recordID.recordName] = obj | |
} | |
completion(recordInfoToObjs) | |
}) | |
} | |
/// Handlers should check existing record with its update against the target record, to see if: | |
/// 1. No existing record - add | |
/// 2. Target record's update > existing record's - replace | |
/// 3. Target record's update <= existing record's - no-op as next syncing cycle will correct things up | |
func upsertRecord(_ recordInfo: String, obj: BaseR) { | |
upsertRecord(recordInfo, obj: obj, retryDelay: 0.2) | |
} | |
func upsertRecord(_ recordInfo: String, obj: BaseR, retryDelay: TimeInterval) { | |
// If not in valid sync state or haven't got the first metaRecord, then we can ignore | |
// for now, and when the sync is backed on or fetching the record, the updated record | |
// will be detected by syncer. | |
guard metaRecordFirstFetched, isSyncStateValid() else { return } | |
guard !isUpserting else { | |
delayToBg(retryDelay) { | |
self.upsertRecord(recordInfo, obj: obj, retryDelay: retryDelay) | |
} | |
return | |
} | |
#if DEBUG | |
if recordInfo.contains(debugSyncWatchId) { | |
pr("watch record:\(recordInfo)") | |
pr() | |
} | |
#endif | |
isUpserting = true | |
defer { | |
delaySoon { | |
self.isUpserting = false | |
} | |
} | |
let update = obj.updateForRecord(recordInfo) | |
#if DEBUG | |
pr("CK upsertRecord firstLine:\(String(describing: (obj as? NoteObj)?.text.firstLine())) update:\(update) recordInfo:\(recordInfo)") | |
#else | |
spr("CK upsertRecord retryDelay:\(retryDelay) update:\(update) recordInfo:\(recordInfo)") | |
#endif | |
guard let info = Record.Info(recordInfo) else { | |
er("CK upsertRecord can't gen info recordInfo:\(recordInfo)", .cloudKit) | |
return | |
} | |
if info.isData { | |
guard obj.dataFileName.hasVal && obj.loadData(readObjDataFirst: false) != nil else { | |
er("CK upsertRecord couldn't loadData - abort. recordInfo:\(recordInfo)", .cloudKit) | |
return | |
} | |
} | |
var wasDeleted = false | |
read { | |
if self.deletedRecordInfos.contains(recordInfo) { | |
er("CK upsertRecord but was in deletedRecordInfos - removing from deletedRecordInfos. recordInfo:\(recordInfo)", .cloudKit) | |
wasDeleted = true | |
} | |
} | |
guard !wasDeleted else { return } | |
var isNewerThanExisting = true | |
read { | |
if let existingUpdate = self.metaRecordInfoToUpdates[recordInfo] { | |
// Equal update is fine because e.g. a note added a media, then note's update field might not | |
// be further updated yet; otherwise, if user set date to future and put stuff then turn dates | |
// back, then if update is within 1 min then we can still go | |
guard update >= existingUpdate || update.t > Date().t - minInSecs else { | |
isNewerThanExisting = false | |
return | |
} | |
} | |
} | |
guard isNewerThanExisting else { return } | |
// Fetch then write | |
let recordID = CKRecord.ID(recordName: recordInfo) | |
db.fetch(withRecordID: recordID) { [weak self] existingRecord, err in | |
guard let s = self else { return } | |
defer { | |
s.isUpserting = false | |
} | |
#if DEBUG | |
if recordInfo.contains(debugSyncWatchId) { | |
pr("watch record:\(recordInfo)") | |
pr() | |
} | |
#endif | |
// Error is normal if the record didn't exist | |
if let err = err { | |
if let err = err as? CKError, err.code == CKError.unknownItem { | |
// 11 is not found for new records | |
} else { | |
er("CK upsertRecord fetchRecord err:\(err) recordInfo:\(recordInfo)", .cloudKit) | |
} | |
} | |
var existingUpdate = 0 | |
if let existingRecord = existingRecord { | |
if info.isData, let update = existingRecord["dataUpdate"] as? Int { | |
existingUpdate = update | |
} else if let update = existingRecord["update"] as? Int { | |
existingUpdate = update | |
} | |
} | |
// NOTE: baseRecord (existingRecord) will be modified if not nil | |
guard let record = s.makeRecord(from: obj, recordID: recordID, baseRecord: existingRecord) else { | |
er("CK upsertRecord expect record from recordInfo:\(recordInfo) obj:\(obj)", .cloudKit) | |
return | |
} | |
if info.isData { | |
if obj.dataUpdate == 0 && update == 0 { | |
er("CK upsertRecord isData obj.dataUpdate & update are both 0 recordInfo:\(recordInfo)", .cloudKit) | |
} | |
// If local obj got updated again before the fetch returned, the update will be inconsistent, where | |
// we can just return as a newer upsert operation will take place | |
guard obj.dataUpdate <= update else { | |
spr("CK upsertRecord isData update:\(update) != obj dataUpdate:\(obj.dataUpdate) - abort. recordInfo:\(recordInfo)") | |
return | |
} | |
// Non-data is either Media or Note, and they both use `update` | |
} else { | |
// If local obj got updated again before the fetch returned, the update will be inconsistent, where | |
// we can just return as a newer upsert operation will take place | |
guard obj.update <= update else { | |
spr("CK upsertRecord update:\(update) < obj update:\(obj.update) recordInfo:\(recordInfo)") | |
return | |
} | |
} | |
// Equal update is fine because e.g. a note added a media, then note's update field might not | |
// be further updated yet; otherwise, if user set date to future and put stuff then turn dates | |
// back, then if update is within 1 min then we can still go | |
guard obj.update >= existingUpdate || obj.update.t > Date().t - minInSecs else { | |
let msg = "CK upsertRecord existing record newer \(existingUpdate) but not in meta; upserting record update:\(update) recordInfo:\(recordInfo)" | |
spr(msg) | |
toWrite { | |
s.metaRecordInfoToUpdates[recordInfo] = existingUpdate | |
} | |
s.scheduleUpdateCKMeta(msg) | |
return | |
} | |
s.db.save(record) { record, err in | |
if let err = err { | |
let shouldRetry = s.handleErr(err, src: "upsertRecord saveRecord \(recordInfo)") | |
if shouldRetry { | |
guard retryDelay <= s.maxRetryDelay else { return } | |
spr("CK upsertRecord save retryDelay:\(retryDelay) err:\(err.desc) recordInfo:\(recordInfo)") | |
delayToBg(retryDelay) { | |
s.upsertRecord(recordInfo, obj: obj, retryDelay: nextRetryDelay(retryDelay)) | |
} | |
return | |
} | |
return | |
} | |
spr("CK upsertRecord save completed recordInfo:\(recordInfo)") | |
toWrite { | |
s.metaRecordInfoToUpdates[recordInfo] = update | |
} | |
s.scheduleUpdateCKMeta("upsertRecord \(recordInfo)") | |
} | |
} | |
} | |
func deleteRecord(_ recordInfo: String) { | |
deleteRecord(recordInfo, induced: false) | |
} | |
// Induced means we try to delete a _data corresponding record or vice versa, in which case err is normal and don't need reporting | |
// markDelete should only be NO in case of CK record read err, in which case we delete the cloud record and re-upload from local | |
func deleteRecord(_ recordInfo: String, induced: Bool, markDelete: Bool = true, retryDelay: TimeInterval = 3) { | |
guard metaRecordFirstFetched, isSyncStateValid() else { | |
pr("CK deleteRecord \(recordInfo) without metaRecordFirstFetched - delay") | |
delayToBg(retryDelay) { | |
self.deleteRecord(recordInfo, induced: induced, markDelete: markDelete, retryDelay: nextRetryDelay(retryDelay)) | |
} | |
return | |
} | |
if induced { | |
spr("CK deleteRecord induced \(recordInfo)") | |
} else { | |
spr("CK deleteRecord \(recordInfo)") | |
} | |
db.delete(withRecordID: CKRecord.ID(recordName: recordInfo)) { [weak self] recordID, err in | |
guard let s = self else { return } | |
if let err = err { | |
if !induced { | |
er("CK deleteRecord err:\(err) for \(recordInfo)", .cloudKit) | |
} | |
// Retry 3 times | |
if retryDelay < 13 { | |
delayToBg(retryDelay) { | |
s.deleteRecord(recordInfo, induced: induced, markDelete: markDelete, retryDelay: nextRetryDelay(retryDelay)) | |
} | |
} | |
return | |
} | |
spr("CK deleteRecord succeeded for \(recordInfo)") | |
toWrite { | |
s.metaRecordInfoToUpdates.removeValue(forKey: recordInfo) | |
s.deletedRecordInfos.append(recordInfo) | |
toBg { | |
s.scheduleUpdateCKMeta("deleteRecord") | |
guard induced else { return } | |
// Induce data or meta counterpart | |
guard let info = Record.Info(recordInfo) else { | |
er("CK deleteRecord recordInfo:\(recordInfo) can't gen info", .cloudKit) | |
return | |
} | |
if let _ = info.mediaID { | |
var inducedRecordInfo = "" | |
if info.isData { | |
inducedRecordInfo = recordInfo.substring(to: recordInfo.count - Record.dataSuffix.count - Record.lv1.count) | |
pr("inducedRecordInfo:\(inducedRecordInfo)") | |
s.deleteRecord(inducedRecordInfo, induced: true) | |
} else { | |
inducedRecordInfo = recordInfo + Record.lv1 + Record.dataSuffix | |
pr("inducedRecordInfo:\(inducedRecordInfo)") | |
} | |
s.deleteRecord(inducedRecordInfo, induced: true) | |
} | |
} | |
} | |
} | |
} | |
// MARK: Meta | |
/* | |
resetMeta is mostly a server operation to rebuild meta, with local delete marks. Steps: | |
1. Clear local records with the exception that local marked deletes are kept | |
2. Trigger CK query ops to fetch all records with ONLY the desired keys | |
* Firstly query for notes, then query for medias. | |
3. Upon ops completion, updates metaRecords and upload to CK server. | |
It should only be called upon: | |
1. User forced sync | |
2. Detected corrupted data | |
3. Errors when fetching meta (except not found error, which calls updateCKMeta directly) | |
*/ | |
// When 1) User forces to sync, 2) Out-of-sync meta is detected, Syncer will call this to re-construct meta | |
public func resetMeta(_ completion: @escaping SuccessBlock) { | |
resetMeta("Syncer delegate resetMeta", completion: completion) | |
} | |
public func resetMeta(_ src: String, completion: @escaping SuccessBlock) { | |
spr("CK resetMeta src:\(src)") | |
guard isSyncStateValid() else { completion(false); return } | |
notesQueryOp?.cancel() | |
mediasQueryOp?.cancel() | |
toWrite { | |
self.metaRecordInfoToUpdates.removeAll() | |
self.resetMetaRecordFetchedCount = 0 | |
toBg { | |
self.doResetMeta(src, completion: completion) | |
} | |
} | |
} | |
private func doResetMeta(_ src: String, completion: @escaping SuccessBlock) { | |
// desiredKeys are fields that are needed besides recordName | |
let desiredKeys = ["update", "dataUpdate"] | |
// Will query for notes first then add mediasQueryOp, so when this is finished we're done | |
let mediasQueryOp = queryOp(on: db, | |
recordType: RecordType.media.rawValue, | |
desiredKeys: desiredKeys, | |
recordFetchedBlock: processQueryOpFetchedRecord) | |
{ [weak self] finished in | |
guard let s = self else { completion(false); return } | |
guard finished else { | |
read { | |
er("mediasQop didn't finish correctly fetched count:\(s.resetMetaRecordFetchedCount) - abort resetMeta", .cloudKit) | |
} | |
completion(false) | |
return | |
} | |
// Reaching here means bareRecordFetched has processed each Note, Media meta & (induced) | |
// Media Data record into metaRecordInfoToUpdates | |
read { | |
spr("CK resetMeta media records fetched count:\(s.resetMetaRecordFetchedCount) - totalCompleted") | |
} | |
// Want a delay for qop synced updates to metaRecordInfo to finish | |
delayToBg(soon) { | |
s.updateCKMeta("resetMeta", forceUpdate: true) { finished in | |
#if DEBUG | |
pr("###") | |
//pr("CK resetMeta final metaRecordInfoToUpdates:") | |
//pr("\(s.metaRecordInfoToUpdates)") | |
pr("CK resetMeta final metaRecordInfoToUpdates count:\(s.metaRecordInfoToUpdates.count)") | |
pr("###") | |
#endif | |
completion(finished) | |
} | |
} | |
} | |
// Query for notes first, then medias | |
let notesQueryOp = queryOp(on: db, | |
recordType: RecordType.note.rawValue, | |
desiredKeys: desiredKeys, | |
recordFetchedBlock: processQueryOpFetchedRecord) | |
{ [weak self] finished in | |
guard let s = self else { completion(false); return } | |
guard finished else { | |
er("notesQop didn't finish correctly fetched count:\(s.resetMetaRecordFetchedCount) - abort resetMeta", .cloudKit) | |
completion(false) | |
return | |
} | |
read { | |
spr("CK resetMeta note records fetched count:\(s.resetMetaRecordFetchedCount) - onto Media...") | |
} | |
s.db.add(mediasQueryOp) | |
} | |
db.add(notesQueryOp) | |
self.notesQueryOp = notesQueryOp | |
self.mediasQueryOp = mediasQueryOp | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment