Last active
January 24, 2020 17:06
-
-
Save noahcoad/5daa80eb3740496838ca530c841afaf0 to your computer and use it in GitHub Desktop.
Test AWS IoT Endpoint MQTT Connection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Tests the connection to the AWS IoT Endpoint for a user's account | |
# By creating an AWS IoT Thing device, certificate, keys, and policy | |
# Then connecting to the AWS IoT Core Endpoint with those device credentials | |
# Tests both port 8883 and 443 | |
# | |
# Created on 2019-09-20 by Noah Coad, Amazon AWS ProServe IoT, ncoad@amazon.com | |
# All rights reserved. | |
# | |
# can also be used to create a new IoT device, or remove old ones | |
# | |
# pip install boto3 AWSIoTPythonSDK | |
# python3 test_aws_iot_connection.py | |
# | |
# also posted at | |
# https://gist.github.com/5daa80eb3740496838ca530c841afaf0 | |
# gist -d "Test AWS IoT Endpoint MQTT Connection" test_aws_iot_connection.py | |
# gist -u 5daa80eb3740496838ca530c841afaf0 test_aws_iot_connection.py | |
# import libraries | |
import boto3, botocore.exceptions | |
import random, string, json, os, os.path, urllib.request, ssl, traceback | |
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient | |
# main workflow | |
def main(): | |
# instantiate the iot client | |
try: client = boto3.client('iot') | |
except botocore.exceptions.NoRegionError as e: | |
print("Error starting AWS connection.\nProbably need to specify your default AWS credentials in AWS credentials file.\nInstall the AWS CLI tool and run: aws configure\nWhen prompted for region, use us-west-2 or whichever region you want to use.\nMessage: %s" % str(e)) | |
return | |
except Exception as e: | |
print("Error starting AWS connection:\n%s" % str(e)) | |
return | |
# show what account number we're working with | |
print("AWS Account: %s" % boto3.client('sts').get_caller_identity().get('Account')) | |
print("AWS Region: %s" % client.meta.region_name) | |
print("OpenSSL Version: %s" % ssl.OPENSSL_VERSION) | |
# download the AWS Root CA | |
if not os.path.exists('root.ca'): | |
print("Downloading Amazon Root CA") | |
urllib.request.urlretrieve('https://www.amazontrust.com/repository/AmazonRootCA1.pem', 'root.ca') | |
# get AWS IoT endpoint | |
iot = {} | |
iot['endpointAddress'] = client.describe_endpoint(endpointType = 'iot:Data-ATS').get('endpointAddress') | |
print("AWS IoT Core Endpoint Address: %s" % iot['endpointAddress']) | |
# name of file to load/save config | |
savefile = os.path.splitext(os.path.basename(__file__))[0] + '.json' | |
# create a new thing, or load from file | |
iot['thing'] = create_thing(client) | |
# iot = json.load(open(savefile))['thing'] | |
# save needed files | |
save_key_files(iot['thing']) | |
# test MQTT connection with the thing | |
test_aws_iot_connection(iot) | |
# save thing settings | |
print("Saved configuration to: %s" % savefile) | |
with open(savefile, 'w') as file: json.dump(iot, file, indent = 4) | |
# clean up | |
if 'thing' in iot: | |
print("Cleaning up by removing resources created") | |
delete_thing(client, iot['thing']) | |
# create a new AWS IoT Device thing for testing | |
def create_thing(client): | |
# track resources in a dict | |
thing = {} | |
# create a unique random salt for this device | |
thing['salt'] = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) | |
# create a unique device thing name | |
thing['name'] = 'connection_test_%s_thing' % thing['salt'] | |
print("AWS IoT Device ThingName: %s" % thing['name']) | |
# create a new IoT thing | |
response = client.create_thing(thingName = thing['name']) | |
# make sure it was a successful call | |
assert response['ResponseMetadata']['HTTPStatusCode'] == 200, "Creating thing failed.\n%s" % response | |
# create device keys and cert | |
response = client.create_keys_and_certificate(setAsActive = True) | |
# import pdb; pdb.set_trace() # debug breakpoint | |
thing['keys'] = {k:v for (k,v) in response.items() if k in ['certificateArn', 'certificateId', 'certificatePem', 'keyPair']} | |
# create an IoT policy | |
thing['policyName'] = 'connection_test_%s_policy' % thing['salt'] | |
doc = '{"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["iot:Publish", "iot:Subscribe", "iot:Connect", "iot:Receive"], "Resource": ["*"]}]}' | |
response = client.create_policy(policyName = thing['policyName'], policyDocument = doc) | |
# attach policy to certificate | |
response = client.attach_policy(policyName = thing['policyName'], target = thing['keys']['certificateArn']) | |
# attach certificate to thing | |
response = client.attach_thing_principal(thingName = thing['name'], principal = thing['keys']['certificateArn']) | |
# return information about the thing created | |
return thing | |
# save key files | |
def save_key_files(thing): | |
open('dev_cert.pem', 'w').write(thing['keys']['certificatePem']) | |
open('dev_public.key', 'w').write(thing['keys']['keyPair']['PublicKey']) | |
open('dev_private.key', 'w').write(thing['keys']['keyPair']['PrivateKey']) | |
# test AWS IoT Connection | |
def test_aws_iot_connection(iot): | |
# create MQTT client | |
mqtt = AWSIoTMQTTClient(clientID = "") | |
# configure certs | |
mqtt.configureCredentials('root.ca', 'dev_private.key', 'dev_cert.pem') | |
# try both ports 443 and 8883 | |
print("") | |
for port in [443, 8883]: | |
mqtt.configureEndpoint(iot['endpointAddress'], port) | |
success = True | |
try: mqtt.connect() | |
except Exception as e: | |
mqtt.disconnect() | |
success = False | |
open('error%s.log' % port, 'w').write(traceback.format_exc()) | |
print("** Error connecting to AWS IoT Core on port %s. See error%s.log for full message." % (port, port)) | |
if success: | |
mqtt.disconnect() | |
print("** Successfully connected to AWS IoT Core on port %s" % port) | |
print("") | |
# clean up | |
def delete_thing(client, thing): | |
client.detach_thing_principal(thingName = thing['name'], principal = thing['keys']['certificateArn']) | |
client.detach_policy(policyName = thing['policyName'], target = thing['keys']['certificateArn']) | |
client.update_certificate(newStatus = 'INACTIVE', certificateId = thing['keys']['certificateId']) | |
client.delete_certificate(certificateId = thing['keys']['certificateId']) | |
client.delete_thing(thingName = thing['name']) | |
client.delete_policy(policyName = thing['policyName']) | |
for file in ['root.ca', 'dev_private.key', 'dev_public.key', 'dev_cert.pem']: | |
if os.path.exists(file): os.remove(file) | |
# if running from command line | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment