Skip to content

Instantly share code, notes, and snippets.

@bobwiller
Last active September 23, 2020 15:33
Show Gist options
  • Save bobwiller/b669f3476fcd2805b4f25b96243d859a to your computer and use it in GitHub Desktop.
Save bobwiller/b669f3476fcd2805b4f25b96243d859a to your computer and use it in GitHub Desktop.
Dart AppSync Subscription
var query = {
'operationName': 'OnNewPets',
'query': '''subscription OnNewPets {
onCreatePet {
id
name
description
}
}'''
};
var _endpoint = 'https://yourendpoint.appsync-api.us-east-1.amazonaws.com/graphql'
try {
http.post(
'$_endpoint',
headers: {
'Authorization': _session.getAccessToken().getJwtToken(), //_session comes from amazon_cognito_identity_dart package
'Content-Type': 'application/json',
},
body: json.encode(query),
)
.then((data) {
Map<String, dynamic> response =
jsonDecode(data.body) as Map<String, dynamic>;
mqtt.MqttClient _client;
mqtt.MqttConnectionState connectionState;
_client = mqtt.MqttClient(
response['extensions']['subscription']['mqttConnections'][0]['url']
.toString(),
response['extensions']['subscription']['mqttConnections'][0]['client']
.toString());
_client.logging(on: true);
_client.keepAlivePeriod = 300;
_client.useWebSocket = true;
_client.onDisconnected = () => print("Disconnected");
_client.onConnected = () => print("Connected");
_client.onSubscribed = (value) => print(value);
_client.port = 443;
try {
_client.connect().then((data) {
if (_client.connectionStatus.state == MqttConnectionState.connected) {
print('CONNECTED!');
try {
_client.subscribe(
response['extensions']['subscription']['mqttConnections'][0]
['topics'][0]
.toString(),
MqttQos.atMostOnce);
_client.updates
.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess =
c[0].payload as MqttPublishMessage;
final String pt = MqttPublishPayload.bytesToStringAsString(
recMess.payload.message);
print(
'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
});
_client.published.listen((MqttPublishMessage message) {
print(
'EXAMPLE::Published notification:: topic is ${message.variableHeader.topicName}, with Qos ${message.header.qos}');
});
} catch (error) {
print(error);
}
}
print(data);
});
} on Exception catch (e) {
print('EXAMPLE::client exception - $e');
_client.disconnect();
}
});
} catch (e) {
print(e);
}
@andrija78
Copy link

I've been using this code for some time, and recently AppSync subscription would randomly stop working for a day. What I found is - sometimes AppSync will maintain sevearal topics, and I would need to subscribe to topic 1, not topic 0 as this code does. This was also the reason it would randomly start working - when the old topic expired, code would start working again.

The solution - we need to grab the topic from "newSubscriptions" and find the right mqqt connection.

Something like this:

`

  var server;
  var clientIdentifier;
  var topic;
  topic=response['extensions']['subscription']['newSubscriptions']['onCreatePet']['topic'];

  for (var m in response['extensions']['subscription']['mqttConnections']) {

    if ((m['topics'] as List<dynamic>).contains(topic)) {
      server=m['url'];  
      clientIdentifier=m['client'];
      break;
    }
  }

  _client = MqttClient(
        server.toString(),
        clientIdentifier.toString()
      );

    await _client.connect(); 
    
    if (_client.connectionStatus.state == MqttConnectionState.connected) {
        _client.subscribe(
            topic.toString(),
            MqttQos.atMostOnce
        );
    }`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment