Created
June 16, 2020 15:18
-
-
Save doit-mattporter/c5d3c4c62c9b1244ccd9c676aada949a to your computer and use it in GitHub Desktop.
Example of how to publish messages to an AWS IoT topic and print those messages back via a subscription to that topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import configparser | |
import json | |
import random | |
import sys | |
from os.path import join | |
from time import sleep | |
from awscrt import io, mqtt | |
from awsiot import mqtt_connection_builder | |
def on_connection_interrupted(connection, error, **kwargs): | |
print(f"Connection interrupted. error: {error}") | |
def on_connection_resumed(connection, return_code, session_present, **kwargs): | |
print(f"Connection resumed. return_code: {return_code} session_present: {session_present}") | |
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present: | |
print("Session did not persist. Resubscribing to existing topics...") | |
resubscribe_future, _ = connection.resubscribe_existing_topics() | |
resubscribe_future.add_done_callback(on_resubscribe_complete) | |
def on_resubscribe_complete(resubscribe_future): | |
resubscribe_results = resubscribe_future.result() | |
print(f"Resubscribe results: {resubscribe_results}") | |
for resub_topic, qos in resubscribe_results["topics"]: | |
if qos is None: | |
sys.exit(f"Server rejected resubscribe to topic: {resub_topic}") | |
def on_message_received(topic, payload, **kwargs): | |
payload = payload.decode("UTF-8") | |
print(f"Received message from topic '{topic}': {payload}") | |
# Parse arguments for AWS IoT connection configuration values | |
parser = argparse.ArgumentParser(description="Connect and register a new device to the AWS IoT platform") | |
parser.add_argument("-c", "--config", | |
type=str, | |
default="perm_config.ini", | |
metavar="ConnectionConfigFile", | |
help="File defining the path where cert files are stored, cert filenames, and an IoT endpoint") | |
args = parser.parse_args() | |
# Parse connection configuration values | |
config = configparser.ConfigParser() | |
config.read(args.config) | |
iot_endpoint = config["SETTINGS"]["IOT_ENDPOINT"] | |
device_building_name = config["SETTINGS"]["DEVICE_BUILDING_NAME"] | |
device_location = config["SETTINGS"]["DEVICE_LOCATION"] | |
root_cert_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["ROOT_CERT"]) | |
private_key_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["SECURE_KEY"]) | |
cert_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["CLAIM_CERT"]) | |
# Gather the UUID machine-id from your Linux device | |
with open("/etc/machine-id") as file: | |
machine_uuid = file.readline().rstrip() | |
event_loop_group = io.EventLoopGroup(1) | |
host_resolver = io.DefaultHostResolver(event_loop_group) | |
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) | |
client_id = f"sensor_{machine_uuid}" | |
mqtt_connection = mqtt_connection_builder.mtls_from_path( | |
endpoint=iot_endpoint, | |
cert_filepath=cert_path, | |
pri_key_filepath=private_key_path, | |
client_bootstrap=client_bootstrap, | |
ca_filepath=root_cert_path, | |
client_id=client_id, | |
on_connection_interrupted=on_connection_interrupted, | |
on_connection_resumed=on_connection_resumed, | |
clean_session=False, | |
keep_alive_secs=6) | |
# io.init_logging(io.LogLevel.Info, 'stderr') | |
sensor_topic = config["SETTINGS"]["IOT_TOPIC"].replace(r"${iot:Connection.Thing.ThingName}", client_id) | |
sensor_topic = sensor_topic.replace("${iot:Connection.Thing.Attributes[BuildingName]}", device_building_name) | |
sensor_topic = sensor_topic.replace("${iot:Connection.Thing.Attributes[Location]}", device_location) | |
print(f"Connecting to {iot_endpoint} with client ID '{client_id}'...") | |
connect_future = mqtt_connection.connect() | |
connect_future.result() | |
print("Connected!") | |
# Subscribe to the topic to verify that messages are being published | |
print(f"Subscribing to topic '{sensor_topic}' ...") | |
subscribe_future, packet_id = mqtt_connection.subscribe( | |
topic=sensor_topic, | |
qos=mqtt.QoS.AT_LEAST_ONCE, | |
callback=on_message_received) | |
subscribe_result = subscribe_future.result() | |
print(f"Subscribed with {str(subscribe_result)}") | |
while True: | |
message = {"TempF": str(random.randint(70, 95))} # Publish random simulated room temperatures in Fahrenheit | |
print(f"Publishing message to topic '{sensor_topic}': {message}") | |
mqtt_connection.publish( | |
topic=sensor_topic, | |
payload=json.dumps(message), | |
qos=mqtt.QoS.AT_LEAST_ONCE) | |
sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment