Skip to content

Instantly share code, notes, and snippets.

@doit-mattporter
Created December 15, 2020 00:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doit-mattporter/7e548158a5dc03b3674282cda19ef663 to your computer and use it in GitHub Desktop.
Save doit-mattporter/7e548158a5dc03b3674282cda19ef663 to your computer and use it in GitHub Desktop.
Stream real-time RaspberryPi-derived (or simulated) temperature values to GCP IoT Core
#!/usr/bin/env python3
import argparse
import configparser
import datetime
import json
import jwt
import paho.mqtt.client as mqtt
import random
import ssl
import time
from glob import glob
from os.path import join
jwt_exp_mins = 60
def create_jwt(project_id, private_key_file):
"""Create a JWT (https://jwt.io) to establish an MQTT connection."""
global jwt_exp_mins
token = {
'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=jwt_exp_mins),
'aud': project_id
}
with open(private_key_file, 'r') as f:
private_key = f.read()
print(f"Creating JWT using ES256 from private key file {private_key_file}")
return jwt.encode(token, private_key, algorithm="ES256")
def error_str(rc):
"""Convert a Paho error to a human readable string."""
return f"{rc}: {mqtt.error_string(rc)}"
def get_iot_device_client(project_id, region, registry_id, device_id, private_key_path, root_cert_path):
"""Establishes an IoT Core-connected client"""
device = Device()
client = mqtt.Client(client_id=f"projects/{project_id}/locations/{region}/registries/{registry_id}/devices/{device_id}")
client.username_pw_set(username="unused",
password=create_jwt(project_id, private_key_path))
client.tls_set(ca_certs=root_cert_path, tls_version=ssl.PROTOCOL_TLSv1_2)
client.on_connect = device.on_connect
client.on_publish = device.on_publish
client.on_disconnect = device.on_disconnect
print("Connecting...")
client.connect("mqtt.googleapis.com", 8883)
print("Connected")
return device, client
class Device(object):
"""Represents the state of a single device."""
def __init__(self):
self.connected = False
self.temp_c = 0
self.temp_f = 0
self.temperature_file = join(glob("/sys/bus/w1/devices/28-*")[0], "w1_slave")
def update_temperature_data(self):
"""Update the temperature"""
with open(self.temperature_file) as infile:
infile.readline()
temperature_line = infile.readline()
self.temp_c = round(float(temperature_line.split(" ")[-1][2:]) / 1000, 1)
self.temp_f = round(self.temp_c * 9 / 5 + 32, 1)
def update_temperature_data_simulated(self):
"""Update the temperature"""
self.temp_c = random.randint(21, 35)
self.temp_f = round(self.temp_c * 9 / 5 + 32, 1)
def on_connect(self, unused_client, unused_userdata, unused_flags, rc):
"""Callback for when a device connects."""
print(f"Connection Result: {error_str(rc)}")
self.connected = True
def on_disconnect(self, unused_client, unused_userdata, rc):
"""Callback for when a device disconnects."""
print(f"Disconnected: {error_str(rc)}")
self.connected = False
def on_publish(self, unused_client, unused_userdata, unused_mid):
"""Callback when the device receives a PUBACK from the MQTT bridge."""
print("Published message acked.")
def stream_temperature_values(wait_interval, project_id, region, registry_id, device_id, private_key_path, root_cert_path):
"""Stream temperature values to the GCP IoT platform"""
global jwt_exp_mins
device, client = get_iot_device_client(project_id, region, registry_id, device_id, private_key_path, root_cert_path)
# This is the topic that the device will publish telemetry events (temperature data) to.
mqtt_telemetry_topic = f"/devices/{device_id}/events/temperature"
# Wait for the device to connect
# device.wait_for_connection(5)
jwt_iat = datetime.datetime.utcnow()
# Update and publish temperature readings every 5 seconds
time.sleep(wait_interval - time.time() % wait_interval) # Sync program to start running every 'wait_interval' seconds based on system clock
while True:
client.loop()
time_point = int(time.time())
device.update_temperature_data()
# device.update_temperature_data_simulated() # Uncomment, then comment out the previous line if you want to publish simulated data
payload = json.dumps({
"timestamp_utc": str(datetime.datetime.utcfromtimestamp(time_point)),
"timestamp_epoch": time_point,
"temp_f": device.temp_f,
"temp_c": device.temp_c,
"device_id": device_id
})
print(f"Publishing payload to topic {mqtt_telemetry_topic}: {payload}")
client.publish(mqtt_telemetry_topic, payload, qos=1)
minutes_since_issue = int((datetime.datetime.utcnow() - jwt_iat).seconds / 60)
if minutes_since_issue >= jwt_exp_mins:
print(f"Refreshing token after {minutes_since_issue}m")
jwt_iat = datetime.datetime.utcnow()
device, client = get_iot_device_client(project_id, region, registry_id, device_id, private_key_path, root_cert_path)
time.sleep(wait_interval - time.time() % wait_interval) # Capture temperature every 'wait_interval' seconds, in sync with system clock
def main():
"""Parse arguments and set global variables"""
# Parse arguments for GCP IoT connection configuration values
parser = argparse.ArgumentParser(description="Connect and register a new device to the GCP IoT platform")
parser.add_argument("-c", "--config",
type=str,
default="gcp_iot_config.txt",
metavar="ConnectionConfigFile",
help="File defining the path where cert files are stored, cert filenames, and an IoT endpoint")
parser.add_argument("-w", "--wait_interval",
type=float,
default=1.0,
metavar="PublishWaitTime",
help="Time in seconds to wait before publishing a new temperature value")
args = parser.parse_args()
# Parse connection configuration values and establish MQTT client connection
config = configparser.ConfigParser()
config.read(args.config)
device_id = config["SETTINGS"]["DEVICE_ID"]
registry_id = config["SETTINGS"]["REGISTRY_ID"]
region = config["SETTINGS"]["REGION"]
project_id = config["SETTINGS"]["PROJECT_ID"]
private_key_path = join(config["SETTINGS"]["KEY_PATH"], config["SETTINGS"]["PRIVATE_KEY"])
root_cert_path = join(config["SETTINGS"]["KEY_PATH"], config["SETTINGS"]["GOOGLE_ROOT_CERT"])
stream_temperature_values(args.wait_interval, project_id, region, registry_id, device_id, private_key_path, root_cert_path)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment