Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example of how to publish DS18B20 temperature sensor values from a Raspberry Pi to AWS IoT
#!/usr/bin/env python3
import argparse
import configparser
import datetime
import json
import time
from glob import glob
from os.path import join
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. error: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. return_code: {return_code} session_present: {session_present}")
# Parse arguments for AWS IoT connection configuration values
parser = argparse.ArgumentParser(description="Connect and register a new device to the AWS IoT platform")
parser.add_argument("-c", "--config",
type=str,
default="perm_config.ini",
metavar="ConnectionConfigFile",
help="File defining the path where cert files are stored, cert filenames, and an IoT endpoint")
args = parser.parse_args()
# Parse connection configuration values
config = configparser.ConfigParser()
config.read(args.config)
iot_endpoint = config["SETTINGS"]["IOT_ENDPOINT"]
device_building_name = config["SETTINGS"]["DEVICE_BUILDING_NAME"]
device_location = config["SETTINGS"]["DEVICE_LOCATION"]
root_cert_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["ROOT_CERT"])
private_key_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["SECURE_KEY"])
cert_path = join(config["SETTINGS"]["SECURE_CERT_PATH"], config["SETTINGS"]["CLAIM_CERT"])
# Gather the UUID machine-id from your Linux device
with open("/etc/machine-id") as file:
machine_uuid = file.readline().rstrip()
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
client_id = f"sensor_{machine_uuid}"
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=iot_endpoint,
cert_filepath=cert_path,
pri_key_filepath=private_key_path,
client_bootstrap=client_bootstrap,
ca_filepath=root_cert_path,
client_id=client_id,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
clean_session=False,
keep_alive_secs=6)
# io.init_logging(io.LogLevel.Info, 'stderr')
sensor_topic = config["SETTINGS"]["IOT_TOPIC"].replace(r"${iot:Connection.Thing.ThingName}", client_id)
sensor_topic = sensor_topic.replace("${iot:Connection.Thing.Attributes[BuildingName]}", device_building_name)
sensor_topic = sensor_topic.replace("${iot:Connection.Thing.Attributes[Location]}", device_location)
print(f"Connecting to {iot_endpoint} with client ID '{client_id}'...")
connect_future = mqtt_connection.connect()
connect_future.result()
print("Connected!")
temperature_file = join(glob("/sys/bus/w1/devices/28-*")[0], "w1_slave")
wait_interval = 5.0
time.sleep(wait_interval - time.time() % wait_interval) # Sync program to start running every 'wait_interval' seconds based on system clock
while True:
time_point = int(time.time())
with open(temperature_file) as infile:
infile.readline()
temperature_line = infile.readline()
temp_c = round(float(temperature_line.split(" ")[-1][2:]) / 1000, 1)
temp_f = round(temp_c * 9 / 5 + 32, 1)
message = {
"ThingName": client_id,
"TimestampUTC": str(datetime.datetime.utcfromtimestamp(time_point)),
"TimestampEpoch": time_point,
"TempF": temp_f,
"BuildingName": device_building_name,
"Location": device_location
}
print(f"Publishing message to topic '{sensor_topic}': {message}")
mqtt_connection.publish(
topic=sensor_topic,
payload=json.dumps(message),
qos=mqtt.QoS.AT_LEAST_ONCE)
time.sleep(wait_interval - time.time() % wait_interval) # Capture temperature every 'wait_interval' seconds, in sync with system clock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment