Skip to content

Instantly share code, notes, and snippets.

@doit-mattporter
Created June 16, 2020 15:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doit-mattporter/c5d3c4c62c9b1244ccd9c676aada949a to your computer and use it in GitHub Desktop.
Save doit-mattporter/c5d3c4c62c9b1244ccd9c676aada949a to your computer and use it in GitHub Desktop.
Example of how to publish messages to an AWS IoT topic and print those messages back via a subscription to that topic
#!/usr/bin/env python3
import argparse
import configparser
import json
import random
import sys
from os.path import join
from time import sleep
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. error: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. return_code: {return_code} session_present: {session_present}")
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
print("Session did not persist. Resubscribing to existing topics...")
resubscribe_future, _ = connection.resubscribe_existing_topics()
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
print(f"Resubscribe results: {resubscribe_results}")
for resub_topic, qos in resubscribe_results["topics"]:
if qos is None:
sys.exit(f"Server rejected resubscribe to topic: {resub_topic}")
def on_message_received(topic, payload, **kwargs):
payload = payload.decode("UTF-8")
print(f"Received message from topic '{topic}': {payload}")
# Parse arguments for AWS IoT connection configuration values
parser = argparse.ArgumentParser(description="Connect and register a new device to the AWS IoT platform")
parser.add_argument("-c", "--config",
type=str,
default="perm_config.ini",
metavar="ConnectionConfigFile",
help="File defining the path where cert files are stored, cert filenames, and an IoT endpoint")
args = parser.parse_args()
# Parse connection configuration values
config = configparser.ConfigParser()
config.read(args.config)
iot_endpoint = config["SETTINGS"]["IOT_ENDPOINT"]
device_building_name = config["SETTINGS"]["DEVICE_BUILDING_NAME"]
device_location = config["SETTINGS"]["DEVICE_LOCATION"]
root_cert_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["ROOT_CERT"])
private_key_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["SECURE_KEY"])
cert_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["CLAIM_CERT"])
# Gather the UUID machine-id from your Linux device
with open("/etc/machine-id") as file:
machine_uuid = file.readline().rstrip()
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
client_id = f"sensor_{machine_uuid}"
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=iot_endpoint,
cert_filepath=cert_path,
pri_key_filepath=private_key_path,
client_bootstrap=client_bootstrap,
ca_filepath=root_cert_path,
client_id=client_id,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
clean_session=False,
keep_alive_secs=6)
# io.init_logging(io.LogLevel.Info, 'stderr')
sensor_topic = config["SETTINGS"]["IOT_TOPIC"].replace(r"${iot:Connection.Thing.ThingName}", client_id)
sensor_topic = sensor_topic.replace("${iot:Connection.Thing.Attributes[BuildingName]}", device_building_name)
sensor_topic = sensor_topic.replace("${iot:Connection.Thing.Attributes[Location]}", device_location)
print(f"Connecting to {iot_endpoint} with client ID '{client_id}'...")
connect_future = mqtt_connection.connect()
connect_future.result()
print("Connected!")
# Subscribe to the topic to verify that messages are being published
print(f"Subscribing to topic '{sensor_topic}' ...")
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=sensor_topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
print(f"Subscribed with {str(subscribe_result)}")
while True:
message = {"TempF": str(random.randint(70, 95))} # Publish random simulated room temperatures in Fahrenheit
print(f"Publishing message to topic '{sensor_topic}': {message}")
mqtt_connection.publish(
topic=sensor_topic,
payload=json.dumps(message),
qos=mqtt.QoS.AT_LEAST_ONCE)
sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment