Skip to content

Instantly share code, notes, and snippets.

@hyouuu
Created October 22, 2019 21:55
Show Gist options
  • Save hyouuu/d9efc663d1e7a153b029c31be20149e4 to your computer and use it in GitHub Desktop.
Save hyouuu/d9efc663d1e7a153b029c31be20149e4 to your computer and use it in GitHub Desktop.
//
// CKSyncHandler+SyncHandlerDelegate.swift
// Pendo
//
// Created by Shawn Gong on 5/29/19.
// Copyright © 2019 Lychee Isle. All rights reserved.
//
import CloudKit
extension CKSyncHandler: SyncHandlerDelegate {
// MARK: Sync Notification
func tryProcessSyncNotif(_ userInfo: [AnyHashable : Any]) -> Bool {
// Try parsing to CKQueryNotification
guard let ckQueryNotif = CKQueryNotification(fromRemoteNotificationDictionary: userInfo),
ckQueryNotif.notificationType == .query else { return false }
spr("CKQueryNotif:\(ckQueryNotif)")
guard let recordID = ckQueryNotif.recordID else {
// If no exact info, initiate a full sync schedule
Syncer.one.schedule("Received CKQueryNotif")
return true
}
let recordInfo = recordID.recordName
guard recordInfo != metaRecordName else {
er("recordInfo == metaRecordName: \(metaRecordName)", .cloudKit)
return true
}
if recordInfo == userSettingsRecordName {
syncerDelegate?.shouldSyncUserSettings(from: assocProvider, src: "Received CKQueryNotif for UserSettings")
return true
}
syncerDelegate?.pushSyncWillStart()
switch ckQueryNotif.queryNotificationReason {
case .recordCreated, .recordUpdated:
readRecord(recordInfo) { [weak self] obj in
defer {
self?.syncerDelegate?.pushSyncDidStop()
}
guard let obj = obj else {
er("Expect obj for \(recordInfo)", .cloudKit)
return
}
self?.recordUpserted(recordInfo, obj: obj)
}
case .recordDeleted:
recordDeleted(recordInfo)
syncerDelegate?.pushSyncDidStop()
@unknown default:
er("Unknown case", .cloudKit)
syncerDelegate?.pushSyncDidStop()
}
return true
}
// MARK: User Settings
/// (userSettings, update)
func readUserSettings(completion: @escaping ([String: String], Int) -> ()) {
readUserSettings(retryDelay: 0.5, completion: completion)
}
/// (userSettings, update)
func readUserSettings(retryDelay: TimeInterval = 0.5, completion: @escaping ([String: String], Int) -> ()) {
db.fetch(withRecordID: CKRecord.ID(recordName: userSettingsRecordName)) { [weak self] record, err in
guard let s = self else { completion([:], 0); return }
// Unless "Not Found", deal with the err and possibly resetMeta
if let err = err, !s.isRecordNotFoundErr(err) {
let shouldRetry = s.handleErr(err, src: "fetchMeta")
if shouldRetry {
guard retryDelay <= s.maxRetryDelay else { completion([:], 0); return }
spr("readUserSettings retryDelay:\(retryDelay)")
delayToBg(retryDelay) {
s.readUserSettings(retryDelay: nextRetryDelay(retryDelay), completion: completion)
}
return
}
completion([:], 0)
return
}
guard let record = record, let jsonNSData = record[s.jsonDataField] as? NSData else {
completion([:], 0)
return
}
var userSettings = [String: String]()
let jsonData = Data(referencing: jsonNSData)
let decoder = JSONDecoder()
do {
userSettings = try decoder.decode([String: String].self, from: jsonData)
} catch {
er("CK readUserSettings can't parse jsonData", .cloudKit)
}
var update = 0
if let localUpdate = userSettings[UserSetting.localUpdatedAt.rawValue], let val = Double(localUpdate) {
update = val.i
pr("CK readUserSettings update:\(update) from localUpdate")
} else if let val = record.modificationDate?.i {
update = val
pr("CK readUserSettings update:\(update) from modificationDate")
}
completion(userSettings, update)
}
}
func upsertUserSettings(_ userSettings: [String: String]) {
upsertUserSettings(userSettings, retryDelay: 0.5)
}
func upsertUserSettings(_ userSettings: [String: String], retryDelay: TimeInterval = 0.5) {
spr("CK upsertUserSettings")
var jsonData = Data()
let encoder = JSONEncoder()
do {
jsonData = try encoder.encode(userSettings)
} catch {
er("CK upsertUserSettings can't encode userSettings", .cloudKit)
}
#if DEBUG
guard let jsonString = String(data: jsonData, encoding: .utf8) else { er(); return }
pr("CK upsertUserSettings json:\(jsonString)")
#endif
let recordID = CKRecord.ID(recordName: userSettingsRecordName)
db.fetch(withRecordID: recordID) { [weak self] fetchedRecord, err in
guard let s = self else { return }
// Create if none existing
let record = fetchedRecord ?? CKRecord(recordType: RecordType.userSettings.rawValue, recordID: recordID)
// Might want to delete existing keys in the future
record[s.jsonDataField] = jsonData as NSData
s.db.save(record) { [weak self] savedRecord, err in
guard let s = self else { return }
guard let err = err else {
spr("CK upsertUserSettings saved record completed")
return
}
let shouldRetry = s.handleErr(err, src: "updateCKMeta saveRecord")
if shouldRetry {
guard retryDelay <= s.maxRetryDelay else { return }
spr("CK upsertUserSettings retryDelay:\(retryDelay)")
delayToBg(retryDelay) {
s.upsertUserSettings(userSettings, retryDelay: nextRetryDelay(retryDelay))
}
}
}
}
}
// MARK: Records
// recordNames, deletedRecordInfos
func listRecordNames(_ completion: @escaping ([String], OrderedSet<String>) -> ()) {
spr("CK listRecordNames")
// Need to refetch every time because if we use cached metaRecordNames then there's no way to know what's on cloud. The meta file shouldn't be too big so shouldn't be a problem for now
fetchMeta("listRecordNames") { [weak self] finished in
guard let s = self else { completion([], []); return }
var recordNames = [String]()
var deletedRecordInfos = OrderedSet<String>()
read {
for (recordInfo, update) in s.metaRecordInfoToUpdates {
recordNames.append(Record.makeName(recordInfo: recordInfo, update: update))
}
deletedRecordInfos = s.deletedRecordInfos
spr("CK listRecordNames recordNames count:\(recordNames.count) deleted count:\(deletedRecordInfos.count)")
}
completion(recordNames, deletedRecordInfos)
}
}
/// [recordInfo : obj]
func readRecords(_ recordInfos: [String], completion: @escaping ([String: BaseR]?) -> ()) {
readRecords(recordInfos, retryDelay: 0.5, completion: completion)
}
func readRecord(_ recordInfo: String, completion: @escaping (BaseR?) -> ()) {
readRecords([recordInfo]) { recordInfoToObjs in
completion(recordInfoToObjs?[recordInfo])
}
}
// [recordInfo : obj]
func readRecords(_ recordInfos: [String],
retryDelay: TimeInterval = 0.5,
completion: @escaping ([String: BaseR]?) -> ())
{
spr("CK readRecords:\(recordInfos)")
guard isSyncStateValid() else {
completion(nil)
return
}
db.add(fetchOp(recordInfos) { [weak self] ckRecordIdToRecords, err in
guard let s = self else { return }
if let err = err {
let shouldRetry = s.handleErr(err, src: "CK readRecords")
if shouldRetry {
guard retryDelay <= s.maxRetryDelay else {
er("CK readRecords passed maxRetryDelay - completion with nil", .cloudKit)
completion(nil)
return
}
spr("readRecords retryDelay:\(retryDelay)")
delayToBg(retryDelay.t) {
s.readRecords(recordInfos,
retryDelay: nextRetryDelay(retryDelay),
completion: completion)
}
} else {
spr("!!! CK readRecords err:\(err.desc)")
s.syncerDelegate?.shouldRestartAndResetMetaUponFinish(for: s.assocProvider)
// If already reading single record, then we're done
guard recordInfos.count > 1 else {
completion(nil)
return
}
// Try to fetch individual ones
var didGroupFinish = false
let dispatchGroup = DispatchGroup()
var ckRecordInfoToObjs = [String: BaseR]()
for recordInfo in recordInfos {
dispatchGroup.enter()
s.readRecords([recordInfo]) { ckRecordInfoToObj in
if let ckRecordInfoToObj = ckRecordInfoToObj, ckRecordInfoToObj.hasVal {
toWrite {
ckRecordInfoToObjs.merge(ckRecordInfoToObj) { (cur, new) in new }
}
} else {
spr("!!! CK err can't read record:\(recordInfo)")
}
if didGroupFinish {
er("CK readRecords partial failure read individual didGroupFinish already", .dispatchGroup)
} else {
delayToBg {
dispatchGroup.leave()
}
}
}
}
dispatchGroup.notify(queue: bgQueue) {
didGroupFinish = true
spr("CK readRecords partial failure dispatchGroup.notify")
var r = [String: BaseR]()
read {
r = ckRecordInfoToObjs
}
completion(r)
}
}
return
}
guard let ckRecordIdToRecords = ckRecordIdToRecords else {
completion(nil)
return
}
var recordInfoToObjs = [String: BaseR]()
for ckRecord in ckRecordIdToRecords.values {
guard let obj = s.makeBaseObj(from: ckRecord) else {
er("Expect obj", .cloudKit)
continue
}
recordInfoToObjs[ckRecord.recordID.recordName] = obj
}
completion(recordInfoToObjs)
})
}
/// Handlers should check existing record with its update against the target record, to see if:
/// 1. No existing record - add
/// 2. Target record's update > existing record's - replace
/// 3. Target record's update <= existing record's - no-op as next syncing cycle will correct things up
func upsertRecord(_ recordInfo: String, obj: BaseR) {
upsertRecord(recordInfo, obj: obj, retryDelay: 0.2)
}
func upsertRecord(_ recordInfo: String, obj: BaseR, retryDelay: TimeInterval) {
// If not in valid sync state or haven't got the first metaRecord, then we can ignore
// for now, and when the sync is backed on or fetching the record, the updated record
// will be detected by syncer.
guard metaRecordFirstFetched, isSyncStateValid() else { return }
guard !isUpserting else {
delayToBg(retryDelay) {
self.upsertRecord(recordInfo, obj: obj, retryDelay: retryDelay)
}
return
}
#if DEBUG
if recordInfo.contains(debugSyncWatchId) {
pr("watch record:\(recordInfo)")
pr()
}
#endif
isUpserting = true
defer {
delaySoon {
self.isUpserting = false
}
}
let update = obj.updateForRecord(recordInfo)
#if DEBUG
pr("CK upsertRecord firstLine:\(String(describing: (obj as? NoteObj)?.text.firstLine())) update:\(update) recordInfo:\(recordInfo)")
#else
spr("CK upsertRecord retryDelay:\(retryDelay) update:\(update) recordInfo:\(recordInfo)")
#endif
guard let info = Record.Info(recordInfo) else {
er("CK upsertRecord can't gen info recordInfo:\(recordInfo)", .cloudKit)
return
}
if info.isData {
guard obj.dataFileName.hasVal && obj.loadData(readObjDataFirst: false) != nil else {
er("CK upsertRecord couldn't loadData - abort. recordInfo:\(recordInfo)", .cloudKit)
return
}
}
var wasDeleted = false
read {
if self.deletedRecordInfos.contains(recordInfo) {
er("CK upsertRecord but was in deletedRecordInfos - removing from deletedRecordInfos. recordInfo:\(recordInfo)", .cloudKit)
wasDeleted = true
}
}
guard !wasDeleted else { return }
var isNewerThanExisting = true
read {
if let existingUpdate = self.metaRecordInfoToUpdates[recordInfo] {
// Equal update is fine because e.g. a note added a media, then note's update field might not
// be further updated yet; otherwise, if user set date to future and put stuff then turn dates
// back, then if update is within 1 min then we can still go
guard update >= existingUpdate || update.t > Date().t - minInSecs else {
isNewerThanExisting = false
return
}
}
}
guard isNewerThanExisting else { return }
// Fetch then write
let recordID = CKRecord.ID(recordName: recordInfo)
db.fetch(withRecordID: recordID) { [weak self] existingRecord, err in
guard let s = self else { return }
defer {
s.isUpserting = false
}
#if DEBUG
if recordInfo.contains(debugSyncWatchId) {
pr("watch record:\(recordInfo)")
pr()
}
#endif
// Error is normal if the record didn't exist
if let err = err {
if let err = err as? CKError, err.code == CKError.unknownItem {
// 11 is not found for new records
} else {
er("CK upsertRecord fetchRecord err:\(err) recordInfo:\(recordInfo)", .cloudKit)
}
}
var existingUpdate = 0
if let existingRecord = existingRecord {
if info.isData, let update = existingRecord["dataUpdate"] as? Int {
existingUpdate = update
} else if let update = existingRecord["update"] as? Int {
existingUpdate = update
}
}
// NOTE: baseRecord (existingRecord) will be modified if not nil
guard let record = s.makeRecord(from: obj, recordID: recordID, baseRecord: existingRecord) else {
er("CK upsertRecord expect record from recordInfo:\(recordInfo) obj:\(obj)", .cloudKit)
return
}
if info.isData {
if obj.dataUpdate == 0 && update == 0 {
er("CK upsertRecord isData obj.dataUpdate & update are both 0 recordInfo:\(recordInfo)", .cloudKit)
}
// If local obj got updated again before the fetch returned, the update will be inconsistent, where
// we can just return as a newer upsert operation will take place
guard obj.dataUpdate <= update else {
spr("CK upsertRecord isData update:\(update) != obj dataUpdate:\(obj.dataUpdate) - abort. recordInfo:\(recordInfo)")
return
}
// Non-data is either Media or Note, and they both use `update`
} else {
// If local obj got updated again before the fetch returned, the update will be inconsistent, where
// we can just return as a newer upsert operation will take place
guard obj.update <= update else {
spr("CK upsertRecord update:\(update) < obj update:\(obj.update) recordInfo:\(recordInfo)")
return
}
}
// Equal update is fine because e.g. a note added a media, then note's update field might not
// be further updated yet; otherwise, if user set date to future and put stuff then turn dates
// back, then if update is within 1 min then we can still go
guard obj.update >= existingUpdate || obj.update.t > Date().t - minInSecs else {
let msg = "CK upsertRecord existing record newer \(existingUpdate) but not in meta; upserting record update:\(update) recordInfo:\(recordInfo)"
spr(msg)
toWrite {
s.metaRecordInfoToUpdates[recordInfo] = existingUpdate
}
s.scheduleUpdateCKMeta(msg)
return
}
s.db.save(record) { record, err in
if let err = err {
let shouldRetry = s.handleErr(err, src: "upsertRecord saveRecord \(recordInfo)")
if shouldRetry {
guard retryDelay <= s.maxRetryDelay else { return }
spr("CK upsertRecord save retryDelay:\(retryDelay) err:\(err.desc) recordInfo:\(recordInfo)")
delayToBg(retryDelay) {
s.upsertRecord(recordInfo, obj: obj, retryDelay: nextRetryDelay(retryDelay))
}
return
}
return
}
spr("CK upsertRecord save completed recordInfo:\(recordInfo)")
toWrite {
s.metaRecordInfoToUpdates[recordInfo] = update
}
s.scheduleUpdateCKMeta("upsertRecord \(recordInfo)")
}
}
}
func deleteRecord(_ recordInfo: String) {
deleteRecord(recordInfo, induced: false)
}
// Induced means we try to delete a _data corresponding record or vice versa, in which case err is normal and don't need reporting
// markDelete should only be NO in case of CK record read err, in which case we delete the cloud record and re-upload from local
func deleteRecord(_ recordInfo: String, induced: Bool, markDelete: Bool = true, retryDelay: TimeInterval = 3) {
guard metaRecordFirstFetched, isSyncStateValid() else {
pr("CK deleteRecord \(recordInfo) without metaRecordFirstFetched - delay")
delayToBg(retryDelay) {
self.deleteRecord(recordInfo, induced: induced, markDelete: markDelete, retryDelay: nextRetryDelay(retryDelay))
}
return
}
if induced {
spr("CK deleteRecord induced \(recordInfo)")
} else {
spr("CK deleteRecord \(recordInfo)")
}
db.delete(withRecordID: CKRecord.ID(recordName: recordInfo)) { [weak self] recordID, err in
guard let s = self else { return }
if let err = err {
if !induced {
er("CK deleteRecord err:\(err) for \(recordInfo)", .cloudKit)
}
// Retry 3 times
if retryDelay < 13 {
delayToBg(retryDelay) {
s.deleteRecord(recordInfo, induced: induced, markDelete: markDelete, retryDelay: nextRetryDelay(retryDelay))
}
}
return
}
spr("CK deleteRecord succeeded for \(recordInfo)")
toWrite {
s.metaRecordInfoToUpdates.removeValue(forKey: recordInfo)
s.deletedRecordInfos.append(recordInfo)
toBg {
s.scheduleUpdateCKMeta("deleteRecord")
guard induced else { return }
// Induce data or meta counterpart
guard let info = Record.Info(recordInfo) else {
er("CK deleteRecord recordInfo:\(recordInfo) can't gen info", .cloudKit)
return
}
if let _ = info.mediaID {
var inducedRecordInfo = ""
if info.isData {
inducedRecordInfo = recordInfo.substring(to: recordInfo.count - Record.dataSuffix.count - Record.lv1.count)
pr("inducedRecordInfo:\(inducedRecordInfo)")
s.deleteRecord(inducedRecordInfo, induced: true)
} else {
inducedRecordInfo = recordInfo + Record.lv1 + Record.dataSuffix
pr("inducedRecordInfo:\(inducedRecordInfo)")
}
s.deleteRecord(inducedRecordInfo, induced: true)
}
}
}
}
}
// MARK: Meta
/*
resetMeta is mostly a server operation to rebuild meta, with local delete marks. Steps:
1. Clear local records with the exception that local marked deletes are kept
2. Trigger CK query ops to fetch all records with ONLY the desired keys
* Firstly query for notes, then query for medias.
3. Upon ops completion, updates metaRecords and upload to CK server.
It should only be called upon:
1. User forced sync
2. Detected corrupted data
3. Errors when fetching meta (except not found error, which calls updateCKMeta directly)
*/
// When 1) User forces to sync, 2) Out-of-sync meta is detected, Syncer will call this to re-construct meta
public func resetMeta(_ completion: @escaping SuccessBlock) {
resetMeta("Syncer delegate resetMeta", completion: completion)
}
public func resetMeta(_ src: String, completion: @escaping SuccessBlock) {
spr("CK resetMeta src:\(src)")
guard isSyncStateValid() else { completion(false); return }
notesQueryOp?.cancel()
mediasQueryOp?.cancel()
toWrite {
self.metaRecordInfoToUpdates.removeAll()
self.resetMetaRecordFetchedCount = 0
toBg {
self.doResetMeta(src, completion: completion)
}
}
}
private func doResetMeta(_ src: String, completion: @escaping SuccessBlock) {
// desiredKeys are fields that are needed besides recordName
let desiredKeys = ["update", "dataUpdate"]
// Will query for notes first then add mediasQueryOp, so when this is finished we're done
let mediasQueryOp = queryOp(on: db,
recordType: RecordType.media.rawValue,
desiredKeys: desiredKeys,
recordFetchedBlock: processQueryOpFetchedRecord)
{ [weak self] finished in
guard let s = self else { completion(false); return }
guard finished else {
read {
er("mediasQop didn't finish correctly fetched count:\(s.resetMetaRecordFetchedCount) - abort resetMeta", .cloudKit)
}
completion(false)
return
}
// Reaching here means bareRecordFetched has processed each Note, Media meta & (induced)
// Media Data record into metaRecordInfoToUpdates
read {
spr("CK resetMeta media records fetched count:\(s.resetMetaRecordFetchedCount) - totalCompleted")
}
// Want a delay for qop synced updates to metaRecordInfo to finish
delayToBg(soon) {
s.updateCKMeta("resetMeta", forceUpdate: true) { finished in
#if DEBUG
pr("###")
//pr("CK resetMeta final metaRecordInfoToUpdates:")
//pr("\(s.metaRecordInfoToUpdates)")
pr("CK resetMeta final metaRecordInfoToUpdates count:\(s.metaRecordInfoToUpdates.count)")
pr("###")
#endif
completion(finished)
}
}
}
// Query for notes first, then medias
let notesQueryOp = queryOp(on: db,
recordType: RecordType.note.rawValue,
desiredKeys: desiredKeys,
recordFetchedBlock: processQueryOpFetchedRecord)
{ [weak self] finished in
guard let s = self else { completion(false); return }
guard finished else {
er("notesQop didn't finish correctly fetched count:\(s.resetMetaRecordFetchedCount) - abort resetMeta", .cloudKit)
completion(false)
return
}
read {
spr("CK resetMeta note records fetched count:\(s.resetMetaRecordFetchedCount) - onto Media...")
}
s.db.add(mediasQueryOp)
}
db.add(notesQueryOp)
self.notesQueryOp = notesQueryOp
self.mediasQueryOp = mediasQueryOp
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment