Created
July 6, 2018 10:12
-
-
Save MarioBajr/d297cc0144a6e8fca4f6bd559946484b to your computer and use it in GitHub Desktop.
AWS Mobile AppSync SDK iOS - Observable Subscriptions State Diff
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From ccd1a51084111d22a87cbe032e4b3b748836fb70 Mon Sep 17 00:00:00 2001 | |
From: Mario Araujo <mario.araujo@ef.com> | |
Date: Fri, 6 Jul 2018 10:55:06 +0100 | |
Subject: [PATCH] - Exposed subscriptions connection states via | |
SubscriptionStatusObserver, passing as an optional argument in subscribe | |
method - Moved AWSAppSyncSubscriptionError from struct to enum with more | |
detailed errors - Removed unnecessary SubscriptionOrderHelper class | |
--- | |
AWSAppSyncClient/AWSAppSyncClient.swift | 67 ++++++++-- | |
.../AWSAppSyncSubscriptionWatcher.swift | 115 +++++++++++------- | |
AWSAppSyncClient/AppSyncMQTTClient.swift | 51 +++++--- | |
3 files changed, 164 insertions(+), 69 deletions(-) | |
diff --git a/AWSAppSyncClient/AWSAppSyncClient.swift b/AWSAppSyncClient/AWSAppSyncClient.swift | |
index a2ca6b7..f526749 100644 | |
--- a/AWSAppSyncClient/AWSAppSyncClient.swift | |
+++ b/AWSAppSyncClient/AWSAppSyncClient.swift | |
@@ -18,6 +18,8 @@ public protocol ConnectionStateChangeHandler { | |
public typealias SubscriptionResultHandler<Operation: GraphQLSubscription> = (_ result: GraphQLResult<Operation.Data>?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void | |
+public typealias SubscriptionStatusObserver = (_ status: SubscritionWatcherStatus) -> Void | |
+ | |
public typealias OptimisticResponseBlock = (ApolloStore.ReadWriteTransaction?) -> Void | |
public typealias MutationConflictHandler<Mutation: GraphQLMutation> = (_ serverState: Snapshot?, _ taskCompletionSource: AWSTaskCompletionSource<Mutation>?, _ resultHandler: OperationResultHandler<Mutation>?) -> Void | |
@@ -304,23 +306,68 @@ public struct AWSAppSyncClientError: Error, LocalizedError { | |
} | |
} | |
-public struct AWSAppSyncSubscriptionError: Error, LocalizedError { | |
- let additionalInfo: String? | |
- let errorDetails: [String:String]? | |
+public enum AWSAppSyncSubscriptionError: Error, LocalizedError { | |
+ case connectionRefused | |
+ case connectionError | |
+ case protocolError | |
+ case requestFailed(Error) | |
+ case parseError(Error) | |
+ case disconnected | |
public var errorDescription: String? { | |
- return additionalInfo ?? "Unable to start subscription." | |
+ switch self { | |
+ case .requestFailed(let error): | |
+ return error.localizedDescription | |
+ case .parseError(let error): | |
+ return error.localizedDescription | |
+ default: | |
+ return "Subscription Terminated." | |
+ } | |
} | |
public var recoverySuggestion: String? { | |
- return errorDetails?["recoverySuggestion"] | |
+ switch self { | |
+ case .requestFailed(_), .parseError(_): | |
+ return nil | |
+ default: | |
+ return "Restart subscription request." | |
+ } | |
} | |
public var failureReason: String? { | |
- return errorDetails?["failureReason"] | |
+ switch self { | |
+ case .requestFailed(_), .parseError(_): | |
+ return nil | |
+ default: | |
+ return "Disconnected from service." | |
+ } | |
+ } | |
+ | |
+ @available(*, deprecated, message: "use errorDescription instead") | |
+ var additionalInfo: String? { | |
+ switch self { | |
+ case .connectionRefused, .connectionError, .protocolError, .disconnected: | |
+ return "Subscription Terminated." | |
+ case .requestFailed(let error): | |
+ return error.localizedDescription | |
+ case .parseError(let error): | |
+ return error.localizedDescription | |
+ } | |
+ } | |
+ | |
+ @available(*, deprecated, message: "use recoverySuggestion and failureReason instead") | |
+ var errorDetails: [String: String]? { | |
+ switch self { | |
+ case .connectionRefused, .connectionError, .protocolError, .disconnected: | |
+ return ["recoverySuggestion" : "Restart subscription request.", | |
+ "failureReason" : "Disconnected from service."] | |
+ default: | |
+ return nil | |
+ } | |
} | |
} | |
+ | |
protocol NetworkConnectionNotification { | |
func onNetworkAvailabilityStatusChanged(isEndpointReachable: Bool) | |
} | |
@@ -445,7 +492,10 @@ public class AWSAppSyncClient: NetworkConnectionNotification { | |
return apolloClient!.watch(query: query, cachePolicy: cachePolicy, queue: queue, resultHandler: resultHandler) | |
} | |
- public func subscribe<Subscription: GraphQLSubscription>(subscription: Subscription, queue: DispatchQueue = DispatchQueue.main, resultHandler: @escaping SubscriptionResultHandler<Subscription>) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? { | |
+ public func subscribe<Subscription: GraphQLSubscription>(subscription: Subscription, | |
+ queue: DispatchQueue = DispatchQueue.main, | |
+ resultHandler: @escaping SubscriptionResultHandler<Subscription>, | |
+ statusObserver: SubscriptionStatusObserver? = nil) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? { | |
return AWSAppSyncSubscriptionWatcher(client: self.appSyncMQTTClient, | |
httpClient: self.httpTransport!, | |
@@ -453,7 +503,8 @@ public class AWSAppSyncClient: NetworkConnectionNotification { | |
subscriptionsQueue: self.subscriptionsQueue, | |
subscription: subscription, | |
handlerQueue: queue, | |
- resultHandler: resultHandler) | |
+ resultHandler: resultHandler, | |
+ statusObserver: statusObserver) | |
} | |
/// Performs a mutation by sending it to the server. | |
diff --git a/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift | |
index 013483f..654fefd 100644 | |
--- a/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift | |
+++ b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift | |
@@ -5,40 +5,26 @@ | |
import Dispatch | |
-protocol MQTTSubscritionWatcher { | |
- func getIdentifier() -> Int | |
- func getTopics() -> [String] | |
- func messageCallbackDelegate(data: Data) | |
- func disconnectCallbackDelegate(error: Error) | |
+public enum SubscritionWatcherStatus { | |
+ case authenticating | |
+ case authenticated | |
+ case connecting | |
+ case connected | |
+ case disconnected | |
+ case connectionRefused | |
+ case connectionError | |
+ case protocolError | |
+ case requestFailed(Error) | |
} | |
-class SubscriptionsOrderHelper { | |
- var count = 0 | |
- var previousCall = Date() | |
- var pendingCount = 0 | |
- var dispatchLock = DispatchQueue(label: "SubscriptionsQueue") | |
- var waitDictionary = [0: true] | |
- static let sharedInstance = SubscriptionsOrderHelper() | |
- | |
- func getLatestCount() -> Int { | |
- count = count + 1 | |
- waitDictionary[count] = false | |
- return count | |
- } | |
- | |
- func markDone(id: Int) { | |
- waitDictionary[id] = true | |
- } | |
- | |
- func shouldWait(id: Int) -> Bool { | |
- for i in 0..<id { | |
- if (waitDictionary[i] == false) { | |
- return true | |
- } | |
- } | |
- return false | |
- } | |
+protocol MQTTSubscritionWatcher: class { | |
+ var status: SubscritionWatcherStatus { get set } | |
+ func getTopics() -> [String] | |
+ func messageCallbackDelegate(data: Data) | |
+ func statusDidChangeDelegate(status: SubscritionWatcherStatus) | |
+ @available(*, deprecated) | |
+ func getIdentifier() -> Int | |
} | |
/// A `AWSAppSyncSubscriptionWatcher` is responsible for watching the subscription, and calling the result handler with a new result whenever any of the data is published on the MQTT topic. It also normalizes the cache before giving the callback to customer. | |
@@ -49,11 +35,26 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti | |
let subscription: Subscription? | |
let handlerQueue: DispatchQueue | |
let resultHandler: SubscriptionResultHandler<Subscription> | |
+ let statusObserver: SubscriptionStatusObserver? | |
internal var subscriptionTopic: [String]? | |
let store: ApolloStore | |
- public let uniqueIdentifier = SubscriptionsOrderHelper.sharedInstance.getLatestCount() | |
+ @available(*, deprecated) | |
+ public let uniqueIdentifier = UUID().hashValue | |
+ var status: SubscritionWatcherStatus = .authenticating { | |
+ didSet { | |
+ self.statusObserver?(status) | |
+ self.reportErrorIfNeeded() | |
+ } | |
+ } | |
- init(client: AppSyncMQTTClient, httpClient: AWSNetworkTransport, store: ApolloStore, subscriptionsQueue: DispatchQueue, subscription: Subscription, handlerQueue: DispatchQueue, resultHandler: @escaping SubscriptionResultHandler<Subscription>) { | |
+ init(client: AppSyncMQTTClient, | |
+ httpClient: AWSNetworkTransport, | |
+ store: ApolloStore, | |
+ subscriptionsQueue: DispatchQueue, | |
+ subscription: Subscription, | |
+ handlerQueue: DispatchQueue, | |
+ resultHandler: @escaping SubscriptionResultHandler<Subscription>, | |
+ statusObserver: SubscriptionStatusObserver? = nil) { | |
self.client = client | |
self.httpClient = httpClient | |
self.store = store | |
@@ -64,6 +65,15 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti | |
resultHandler(result, transaction, error) | |
} | |
} | |
+ if let statusObserver = statusObserver { | |
+ self.statusObserver = { (status) in | |
+ handlerQueue.async { | |
+ statusObserver(status) | |
+ } | |
+ } | |
+ } else { | |
+ self.statusObserver = nil | |
+ } | |
subscriptionsQueue.async { [weak self] in | |
self?.startSubscription() | |
} | |
@@ -78,7 +88,7 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti | |
self.performSubscriptionRequest(completionHandler: { [weak self] (success, error) in | |
if let error = error { | |
- self?.resultHandler(nil, nil, error) | |
+ self?.status = .requestFailed(error) | |
} | |
semaphore.signal() | |
}) | |
@@ -94,19 +104,19 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti | |
let subscriptionResult = try AWSGraphQLSubscriptionResponseParser(body: response).parseResult() | |
if let subscriptionInfo = subscriptionResult.subscriptionInfo { | |
self.subscriptionTopic = subscriptionResult.newTopics | |
- self.client?.addWatcher(watcher: self, topics: subscriptionResult.newTopics!, identifier: self.uniqueIdentifier) | |
+ self.client?.addWatcher(watcher: self, topics: subscriptionResult.newTopics!) | |
self.client?.startSubscriptions(subscriptionInfo: subscriptionInfo) | |
} | |
completionHandler(true, nil) | |
} catch { | |
- completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil)) | |
+ completionHandler(false, error) | |
} | |
} else if let error = error { | |
- completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil)) | |
+ completionHandler(false, error) | |
} | |
}) | |
} catch { | |
- completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil)) | |
+ completionHandler(false, error) | |
} | |
} | |
@@ -114,10 +124,6 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti | |
return subscriptionTopic ?? [String]() | |
} | |
- func disconnectCallbackDelegate(error: Error) { | |
- self.resultHandler(nil, nil, error) | |
- } | |
- | |
func messageCallbackDelegate(data: Data) { | |
do { | |
let datastring = NSString(data: data, encoding: String.Encoding.utf8.rawValue)! as String | |
@@ -137,10 +143,31 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti | |
} | |
} | |
}.catch { error in | |
- self.resultHandler(nil, nil, error) | |
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.parseError(error)) | |
} | |
} catch { | |
- self.resultHandler(nil, nil, error) | |
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.parseError(error)) | |
+ } | |
+ } | |
+ | |
+ func statusDidChangeDelegate(status: SubscritionWatcherStatus) { | |
+ self.status = status | |
+ } | |
+ | |
+ private func reportErrorIfNeeded() { | |
+ switch self.status { | |
+ case .authenticating, .authenticated, .connecting, .connected: | |
+ break | |
+ case .connectionRefused: | |
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.connectionRefused) | |
+ case .connectionError: | |
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.connectionError) | |
+ case .protocolError: | |
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.protocolError) | |
+ case .requestFailed(let error): | |
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.requestFailed(error)) | |
+ case .disconnected: | |
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.disconnected) | |
} | |
} | |
diff --git a/AWSAppSyncClient/AppSyncMQTTClient.swift b/AWSAppSyncClient/AppSyncMQTTClient.swift | |
index 7edf6a7..37ec609 100644 | |
--- a/AWSAppSyncClient/AppSyncMQTTClient.swift | |
+++ b/AWSAppSyncClient/AppSyncMQTTClient.swift | |
@@ -32,24 +32,20 @@ class AppSyncMQTTClient: MQTTClientDelegate { | |
return | |
} | |
- if status.rawValue == 2 { | |
- for topic in topics { | |
- mqttClient.subscribe(toTopic: topic, qos: 1, extendedCallback: nil) | |
- } | |
- } else if status.rawValue >= 3 { | |
- let error = AWSAppSyncSubscriptionError(additionalInfo: "Subscription Terminated.", errorDetails: [ | |
- "recoverySuggestion" : "Restart subscription request.", | |
- "failureReason" : "Disconnected from service."]) | |
- | |
- topics.map({ self.topicSubscribersDictionary[$0] }) | |
- .flatMap({$0}) | |
- .flatMap({$0}) | |
- .forEach({$0.disconnectCallbackDelegate(error: error)}) | |
+ switch status { | |
+ case .connected: | |
+ topics.forEach({ mqttClient.subscribe(toTopic: $0, qos: 1, extendedCallback: nil) }) | |
+ default: | |
+ break | |
} | |
+ | |
+ topics.map({ self.topicSubscribersDictionary[$0] }) | |
+ .flatMap({ $0 }) | |
+ .flatMap({ $0 }).forEach({ $0.status = SubscritionWatcherStatus(status: status) }) | |
} | |
} | |
- func addWatcher(watcher: MQTTSubscritionWatcher, topics: [String], identifier: Int) { | |
+ func addWatcher(watcher: MQTTSubscritionWatcher, topics: [String]) { | |
self.subscriptionsQueue.async { [weak self] in | |
guard let `self` = self else { | |
return | |
@@ -142,7 +138,7 @@ class AppSyncMQTTClient: MQTTClientDelegate { | |
usingCancelling subscription: MQTTSubscritionWatcher) -> [String: [MQTTSubscritionWatcher]] { | |
return topicSubscribersDictionary.reduce(into: [:]) { (result, element) in | |
- result[element.key] = removedSubscriber(array: element.value, of: subscription.getIdentifier()) | |
+ result[element.key] = removedSubscriber(array: element.value, of: subscription) | |
} | |
} | |
@@ -164,7 +160,28 @@ class AppSyncMQTTClient: MQTTClientDelegate { | |
/// - array: [MQTTSubscritionWatcher] | |
/// - id: Int | |
/// - Returns: updated array [MQTTSubscritionWatcher] | |
- private func removedSubscriber(array: [MQTTSubscritionWatcher], of id: Int) -> [MQTTSubscritionWatcher] { | |
- return array.filter({$0.getIdentifier() != id }) | |
+ private func removedSubscriber(array: [MQTTSubscritionWatcher], of subscription: MQTTSubscritionWatcher) -> [MQTTSubscritionWatcher] { | |
+ return array.filter({ $0 !== subscription }) | |
+ } | |
+} | |
+ | |
+internal extension SubscritionWatcherStatus { | |
+ internal init(status: MQTTStatus) { | |
+ switch status { | |
+ case .connecting: | |
+ self = .connecting | |
+ case .connected: | |
+ self = .connected | |
+ case .connectionError: | |
+ self = .connectionError | |
+ case .connectionRefused: | |
+ self = .connectionRefused | |
+ case .protocolError: | |
+ self = .protocolError | |
+ case .disconnected: | |
+ self = .disconnected | |
+ case .unknown: | |
+ self = .authenticated | |
+ } | |
} | |
} | |
-- | |
2.17.1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment