Skip to content

Instantly share code, notes, and snippets.

@MarioBajr
Created July 6, 2018 10:12
Show Gist options
  • Save MarioBajr/d297cc0144a6e8fca4f6bd559946484b to your computer and use it in GitHub Desktop.
Save MarioBajr/d297cc0144a6e8fca4f6bd559946484b to your computer and use it in GitHub Desktop.
AWS Mobile AppSync SDK iOS - Observable Subscriptions State Diff
From ccd1a51084111d22a87cbe032e4b3b748836fb70 Mon Sep 17 00:00:00 2001
From: Mario Araujo <mario.araujo@ef.com>
Date: Fri, 6 Jul 2018 10:55:06 +0100
Subject: [PATCH] - Exposed subscriptions connection states via
SubscriptionStatusObserver, passing as an optional argument in subscribe
method - Moved AWSAppSyncSubscriptionError from struct to enum with more
detailed errors - Removed unnecessary SubscriptionOrderHelper class
---
AWSAppSyncClient/AWSAppSyncClient.swift | 67 ++++++++--
.../AWSAppSyncSubscriptionWatcher.swift | 115 +++++++++++-------
AWSAppSyncClient/AppSyncMQTTClient.swift | 51 +++++---
3 files changed, 164 insertions(+), 69 deletions(-)
diff --git a/AWSAppSyncClient/AWSAppSyncClient.swift b/AWSAppSyncClient/AWSAppSyncClient.swift
index a2ca6b7..f526749 100644
--- a/AWSAppSyncClient/AWSAppSyncClient.swift
+++ b/AWSAppSyncClient/AWSAppSyncClient.swift
@@ -18,6 +18,8 @@ public protocol ConnectionStateChangeHandler {
public typealias SubscriptionResultHandler<Operation: GraphQLSubscription> = (_ result: GraphQLResult<Operation.Data>?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void
+public typealias SubscriptionStatusObserver = (_ status: SubscritionWatcherStatus) -> Void
+
public typealias OptimisticResponseBlock = (ApolloStore.ReadWriteTransaction?) -> Void
public typealias MutationConflictHandler<Mutation: GraphQLMutation> = (_ serverState: Snapshot?, _ taskCompletionSource: AWSTaskCompletionSource<Mutation>?, _ resultHandler: OperationResultHandler<Mutation>?) -> Void
@@ -304,23 +306,68 @@ public struct AWSAppSyncClientError: Error, LocalizedError {
}
}
-public struct AWSAppSyncSubscriptionError: Error, LocalizedError {
- let additionalInfo: String?
- let errorDetails: [String:String]?
+public enum AWSAppSyncSubscriptionError: Error, LocalizedError {
+ case connectionRefused
+ case connectionError
+ case protocolError
+ case requestFailed(Error)
+ case parseError(Error)
+ case disconnected
public var errorDescription: String? {
- return additionalInfo ?? "Unable to start subscription."
+ switch self {
+ case .requestFailed(let error):
+ return error.localizedDescription
+ case .parseError(let error):
+ return error.localizedDescription
+ default:
+ return "Subscription Terminated."
+ }
}
public var recoverySuggestion: String? {
- return errorDetails?["recoverySuggestion"]
+ switch self {
+ case .requestFailed(_), .parseError(_):
+ return nil
+ default:
+ return "Restart subscription request."
+ }
}
public var failureReason: String? {
- return errorDetails?["failureReason"]
+ switch self {
+ case .requestFailed(_), .parseError(_):
+ return nil
+ default:
+ return "Disconnected from service."
+ }
+ }
+
+ @available(*, deprecated, message: "use errorDescription instead")
+ var additionalInfo: String? {
+ switch self {
+ case .connectionRefused, .connectionError, .protocolError, .disconnected:
+ return "Subscription Terminated."
+ case .requestFailed(let error):
+ return error.localizedDescription
+ case .parseError(let error):
+ return error.localizedDescription
+ }
+ }
+
+ @available(*, deprecated, message: "use recoverySuggestion and failureReason instead")
+ var errorDetails: [String: String]? {
+ switch self {
+ case .connectionRefused, .connectionError, .protocolError, .disconnected:
+ return ["recoverySuggestion" : "Restart subscription request.",
+ "failureReason" : "Disconnected from service."]
+ default:
+ return nil
+ }
}
}
+
protocol NetworkConnectionNotification {
func onNetworkAvailabilityStatusChanged(isEndpointReachable: Bool)
}
@@ -445,7 +492,10 @@ public class AWSAppSyncClient: NetworkConnectionNotification {
return apolloClient!.watch(query: query, cachePolicy: cachePolicy, queue: queue, resultHandler: resultHandler)
}
- public func subscribe<Subscription: GraphQLSubscription>(subscription: Subscription, queue: DispatchQueue = DispatchQueue.main, resultHandler: @escaping SubscriptionResultHandler<Subscription>) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? {
+ public func subscribe<Subscription: GraphQLSubscription>(subscription: Subscription,
+ queue: DispatchQueue = DispatchQueue.main,
+ resultHandler: @escaping SubscriptionResultHandler<Subscription>,
+ statusObserver: SubscriptionStatusObserver? = nil) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? {
return AWSAppSyncSubscriptionWatcher(client: self.appSyncMQTTClient,
httpClient: self.httpTransport!,
@@ -453,7 +503,8 @@ public class AWSAppSyncClient: NetworkConnectionNotification {
subscriptionsQueue: self.subscriptionsQueue,
subscription: subscription,
handlerQueue: queue,
- resultHandler: resultHandler)
+ resultHandler: resultHandler,
+ statusObserver: statusObserver)
}
/// Performs a mutation by sending it to the server.
diff --git a/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift
index 013483f..654fefd 100644
--- a/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift
+++ b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift
@@ -5,40 +5,26 @@
import Dispatch
-protocol MQTTSubscritionWatcher {
- func getIdentifier() -> Int
- func getTopics() -> [String]
- func messageCallbackDelegate(data: Data)
- func disconnectCallbackDelegate(error: Error)
+public enum SubscritionWatcherStatus {
+ case authenticating
+ case authenticated
+ case connecting
+ case connected
+ case disconnected
+ case connectionRefused
+ case connectionError
+ case protocolError
+ case requestFailed(Error)
}
-class SubscriptionsOrderHelper {
- var count = 0
- var previousCall = Date()
- var pendingCount = 0
- var dispatchLock = DispatchQueue(label: "SubscriptionsQueue")
- var waitDictionary = [0: true]
- static let sharedInstance = SubscriptionsOrderHelper()
-
- func getLatestCount() -> Int {
- count = count + 1
- waitDictionary[count] = false
- return count
- }
-
- func markDone(id: Int) {
- waitDictionary[id] = true
- }
-
- func shouldWait(id: Int) -> Bool {
- for i in 0..<id {
- if (waitDictionary[i] == false) {
- return true
- }
- }
- return false
- }
+protocol MQTTSubscritionWatcher: class {
+ var status: SubscritionWatcherStatus { get set }
+ func getTopics() -> [String]
+ func messageCallbackDelegate(data: Data)
+ func statusDidChangeDelegate(status: SubscritionWatcherStatus)
+ @available(*, deprecated)
+ func getIdentifier() -> Int
}
/// A `AWSAppSyncSubscriptionWatcher` is responsible for watching the subscription, and calling the result handler with a new result whenever any of the data is published on the MQTT topic. It also normalizes the cache before giving the callback to customer.
@@ -49,11 +35,26 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti
let subscription: Subscription?
let handlerQueue: DispatchQueue
let resultHandler: SubscriptionResultHandler<Subscription>
+ let statusObserver: SubscriptionStatusObserver?
internal var subscriptionTopic: [String]?
let store: ApolloStore
- public let uniqueIdentifier = SubscriptionsOrderHelper.sharedInstance.getLatestCount()
+ @available(*, deprecated)
+ public let uniqueIdentifier = UUID().hashValue
+ var status: SubscritionWatcherStatus = .authenticating {
+ didSet {
+ self.statusObserver?(status)
+ self.reportErrorIfNeeded()
+ }
+ }
- init(client: AppSyncMQTTClient, httpClient: AWSNetworkTransport, store: ApolloStore, subscriptionsQueue: DispatchQueue, subscription: Subscription, handlerQueue: DispatchQueue, resultHandler: @escaping SubscriptionResultHandler<Subscription>) {
+ init(client: AppSyncMQTTClient,
+ httpClient: AWSNetworkTransport,
+ store: ApolloStore,
+ subscriptionsQueue: DispatchQueue,
+ subscription: Subscription,
+ handlerQueue: DispatchQueue,
+ resultHandler: @escaping SubscriptionResultHandler<Subscription>,
+ statusObserver: SubscriptionStatusObserver? = nil) {
self.client = client
self.httpClient = httpClient
self.store = store
@@ -64,6 +65,15 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti
resultHandler(result, transaction, error)
}
}
+ if let statusObserver = statusObserver {
+ self.statusObserver = { (status) in
+ handlerQueue.async {
+ statusObserver(status)
+ }
+ }
+ } else {
+ self.statusObserver = nil
+ }
subscriptionsQueue.async { [weak self] in
self?.startSubscription()
}
@@ -78,7 +88,7 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti
self.performSubscriptionRequest(completionHandler: { [weak self] (success, error) in
if let error = error {
- self?.resultHandler(nil, nil, error)
+ self?.status = .requestFailed(error)
}
semaphore.signal()
})
@@ -94,19 +104,19 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti
let subscriptionResult = try AWSGraphQLSubscriptionResponseParser(body: response).parseResult()
if let subscriptionInfo = subscriptionResult.subscriptionInfo {
self.subscriptionTopic = subscriptionResult.newTopics
- self.client?.addWatcher(watcher: self, topics: subscriptionResult.newTopics!, identifier: self.uniqueIdentifier)
+ self.client?.addWatcher(watcher: self, topics: subscriptionResult.newTopics!)
self.client?.startSubscriptions(subscriptionInfo: subscriptionInfo)
}
completionHandler(true, nil)
} catch {
- completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil))
+ completionHandler(false, error)
}
} else if let error = error {
- completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil))
+ completionHandler(false, error)
}
})
} catch {
- completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil))
+ completionHandler(false, error)
}
}
@@ -114,10 +124,6 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti
return subscriptionTopic ?? [String]()
}
- func disconnectCallbackDelegate(error: Error) {
- self.resultHandler(nil, nil, error)
- }
-
func messageCallbackDelegate(data: Data) {
do {
let datastring = NSString(data: data, encoding: String.Encoding.utf8.rawValue)! as String
@@ -137,10 +143,31 @@ public final class AWSAppSyncSubscriptionWatcher<Subscription: GraphQLSubscripti
}
}
}.catch { error in
- self.resultHandler(nil, nil, error)
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.parseError(error))
}
} catch {
- self.resultHandler(nil, nil, error)
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.parseError(error))
+ }
+ }
+
+ func statusDidChangeDelegate(status: SubscritionWatcherStatus) {
+ self.status = status
+ }
+
+ private func reportErrorIfNeeded() {
+ switch self.status {
+ case .authenticating, .authenticated, .connecting, .connected:
+ break
+ case .connectionRefused:
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.connectionRefused)
+ case .connectionError:
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.connectionError)
+ case .protocolError:
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.protocolError)
+ case .requestFailed(let error):
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.requestFailed(error))
+ case .disconnected:
+ self.resultHandler(nil, nil, AWSAppSyncSubscriptionError.disconnected)
}
}
diff --git a/AWSAppSyncClient/AppSyncMQTTClient.swift b/AWSAppSyncClient/AppSyncMQTTClient.swift
index 7edf6a7..37ec609 100644
--- a/AWSAppSyncClient/AppSyncMQTTClient.swift
+++ b/AWSAppSyncClient/AppSyncMQTTClient.swift
@@ -32,24 +32,20 @@ class AppSyncMQTTClient: MQTTClientDelegate {
return
}
- if status.rawValue == 2 {
- for topic in topics {
- mqttClient.subscribe(toTopic: topic, qos: 1, extendedCallback: nil)
- }
- } else if status.rawValue >= 3 {
- let error = AWSAppSyncSubscriptionError(additionalInfo: "Subscription Terminated.", errorDetails: [
- "recoverySuggestion" : "Restart subscription request.",
- "failureReason" : "Disconnected from service."])
-
- topics.map({ self.topicSubscribersDictionary[$0] })
- .flatMap({$0})
- .flatMap({$0})
- .forEach({$0.disconnectCallbackDelegate(error: error)})
+ switch status {
+ case .connected:
+ topics.forEach({ mqttClient.subscribe(toTopic: $0, qos: 1, extendedCallback: nil) })
+ default:
+ break
}
+
+ topics.map({ self.topicSubscribersDictionary[$0] })
+ .flatMap({ $0 })
+ .flatMap({ $0 }).forEach({ $0.status = SubscritionWatcherStatus(status: status) })
}
}
- func addWatcher(watcher: MQTTSubscritionWatcher, topics: [String], identifier: Int) {
+ func addWatcher(watcher: MQTTSubscritionWatcher, topics: [String]) {
self.subscriptionsQueue.async { [weak self] in
guard let `self` = self else {
return
@@ -142,7 +138,7 @@ class AppSyncMQTTClient: MQTTClientDelegate {
usingCancelling subscription: MQTTSubscritionWatcher) -> [String: [MQTTSubscritionWatcher]] {
return topicSubscribersDictionary.reduce(into: [:]) { (result, element) in
- result[element.key] = removedSubscriber(array: element.value, of: subscription.getIdentifier())
+ result[element.key] = removedSubscriber(array: element.value, of: subscription)
}
}
@@ -164,7 +160,28 @@ class AppSyncMQTTClient: MQTTClientDelegate {
/// - array: [MQTTSubscritionWatcher]
/// - id: Int
/// - Returns: updated array [MQTTSubscritionWatcher]
- private func removedSubscriber(array: [MQTTSubscritionWatcher], of id: Int) -> [MQTTSubscritionWatcher] {
- return array.filter({$0.getIdentifier() != id })
+ private func removedSubscriber(array: [MQTTSubscritionWatcher], of subscription: MQTTSubscritionWatcher) -> [MQTTSubscritionWatcher] {
+ return array.filter({ $0 !== subscription })
+ }
+}
+
+internal extension SubscritionWatcherStatus {
+ internal init(status: MQTTStatus) {
+ switch status {
+ case .connecting:
+ self = .connecting
+ case .connected:
+ self = .connected
+ case .connectionError:
+ self = .connectionError
+ case .connectionRefused:
+ self = .connectionRefused
+ case .protocolError:
+ self = .protocolError
+ case .disconnected:
+ self = .disconnected
+ case .unknown:
+ self = .authenticated
+ }
}
}
--
2.17.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment