Skip to content

Instantly share code, notes, and snippets.

@noahcoad
Last active January 24, 2020 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noahcoad/5daa80eb3740496838ca530c841afaf0 to your computer and use it in GitHub Desktop.
Save noahcoad/5daa80eb3740496838ca530c841afaf0 to your computer and use it in GitHub Desktop.
Test AWS IoT Endpoint MQTT Connection
#
# Tests the connection to the AWS IoT Endpoint for a user's account
# By creating an AWS IoT Thing device, certificate, keys, and policy
# Then connecting to the AWS IoT Core Endpoint with those device credentials
# Tests both port 8883 and 443
#
# Created on 2019-09-20 by Noah Coad, Amazon AWS ProServe IoT, ncoad@amazon.com
# All rights reserved.
#
# can also be used to create a new IoT device, or remove old ones
#
# pip install boto3 AWSIoTPythonSDK
# python3 test_aws_iot_connection.py
#
# also posted at
# https://gist.github.com/5daa80eb3740496838ca530c841afaf0
# gist -d "Test AWS IoT Endpoint MQTT Connection" test_aws_iot_connection.py
# gist -u 5daa80eb3740496838ca530c841afaf0 test_aws_iot_connection.py
# import libraries
import boto3, botocore.exceptions
import random, string, json, os, os.path, urllib.request, ssl, traceback
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
# main workflow
def main():
# instantiate the iot client
try: client = boto3.client('iot')
except botocore.exceptions.NoRegionError as e:
print("Error starting AWS connection.\nProbably need to specify your default AWS credentials in AWS credentials file.\nInstall the AWS CLI tool and run: aws configure\nWhen prompted for region, use us-west-2 or whichever region you want to use.\nMessage: %s" % str(e))
return
except Exception as e:
print("Error starting AWS connection:\n%s" % str(e))
return
# show what account number we're working with
print("AWS Account: %s" % boto3.client('sts').get_caller_identity().get('Account'))
print("AWS Region: %s" % client.meta.region_name)
print("OpenSSL Version: %s" % ssl.OPENSSL_VERSION)
# download the AWS Root CA
if not os.path.exists('root.ca'):
print("Downloading Amazon Root CA")
urllib.request.urlretrieve('https://www.amazontrust.com/repository/AmazonRootCA1.pem', 'root.ca')
# get AWS IoT endpoint
iot = {}
iot['endpointAddress'] = client.describe_endpoint(endpointType = 'iot:Data-ATS').get('endpointAddress')
print("AWS IoT Core Endpoint Address: %s" % iot['endpointAddress'])
# name of file to load/save config
savefile = os.path.splitext(os.path.basename(__file__))[0] + '.json'
# create a new thing, or load from file
iot['thing'] = create_thing(client)
# iot = json.load(open(savefile))['thing']
# save needed files
save_key_files(iot['thing'])
# test MQTT connection with the thing
test_aws_iot_connection(iot)
# save thing settings
print("Saved configuration to: %s" % savefile)
with open(savefile, 'w') as file: json.dump(iot, file, indent = 4)
# clean up
if 'thing' in iot:
print("Cleaning up by removing resources created")
delete_thing(client, iot['thing'])
# create a new AWS IoT Device thing for testing
def create_thing(client):
# track resources in a dict
thing = {}
# create a unique random salt for this device
thing['salt'] = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
# create a unique device thing name
thing['name'] = 'connection_test_%s_thing' % thing['salt']
print("AWS IoT Device ThingName: %s" % thing['name'])
# create a new IoT thing
response = client.create_thing(thingName = thing['name'])
# make sure it was a successful call
assert response['ResponseMetadata']['HTTPStatusCode'] == 200, "Creating thing failed.\n%s" % response
# create device keys and cert
response = client.create_keys_and_certificate(setAsActive = True)
# import pdb; pdb.set_trace() # debug breakpoint
thing['keys'] = {k:v for (k,v) in response.items() if k in ['certificateArn', 'certificateId', 'certificatePem', 'keyPair']}
# create an IoT policy
thing['policyName'] = 'connection_test_%s_policy' % thing['salt']
doc = '{"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["iot:Publish", "iot:Subscribe", "iot:Connect", "iot:Receive"], "Resource": ["*"]}]}'
response = client.create_policy(policyName = thing['policyName'], policyDocument = doc)
# attach policy to certificate
response = client.attach_policy(policyName = thing['policyName'], target = thing['keys']['certificateArn'])
# attach certificate to thing
response = client.attach_thing_principal(thingName = thing['name'], principal = thing['keys']['certificateArn'])
# return information about the thing created
return thing
# save key files
def save_key_files(thing):
open('dev_cert.pem', 'w').write(thing['keys']['certificatePem'])
open('dev_public.key', 'w').write(thing['keys']['keyPair']['PublicKey'])
open('dev_private.key', 'w').write(thing['keys']['keyPair']['PrivateKey'])
# test AWS IoT Connection
def test_aws_iot_connection(iot):
# create MQTT client
mqtt = AWSIoTMQTTClient(clientID = "")
# configure certs
mqtt.configureCredentials('root.ca', 'dev_private.key', 'dev_cert.pem')
# try both ports 443 and 8883
print("")
for port in [443, 8883]:
mqtt.configureEndpoint(iot['endpointAddress'], port)
success = True
try: mqtt.connect()
except Exception as e:
mqtt.disconnect()
success = False
open('error%s.log' % port, 'w').write(traceback.format_exc())
print("** Error connecting to AWS IoT Core on port %s. See error%s.log for full message." % (port, port))
if success:
mqtt.disconnect()
print("** Successfully connected to AWS IoT Core on port %s" % port)
print("")
# clean up
def delete_thing(client, thing):
client.detach_thing_principal(thingName = thing['name'], principal = thing['keys']['certificateArn'])
client.detach_policy(policyName = thing['policyName'], target = thing['keys']['certificateArn'])
client.update_certificate(newStatus = 'INACTIVE', certificateId = thing['keys']['certificateId'])
client.delete_certificate(certificateId = thing['keys']['certificateId'])
client.delete_thing(thingName = thing['name'])
client.delete_policy(policyName = thing['policyName'])
for file in ['root.ca', 'dev_private.key', 'dev_public.key', 'dev_cert.pem']:
if os.path.exists(file): os.remove(file)
# if running from command line
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment