Created
December 15, 2020 00:58
-
-
Save doit-mattporter/7e548158a5dc03b3674282cda19ef663 to your computer and use it in GitHub Desktop.
Stream real-time RaspberryPi-derived (or simulated) temperature values to GCP IoT Core
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import configparser | |
import datetime | |
import json | |
import jwt | |
import paho.mqtt.client as mqtt | |
import random | |
import ssl | |
import time | |
from glob import glob | |
from os.path import join | |
jwt_exp_mins = 60 | |
def create_jwt(project_id, private_key_file): | |
"""Create a JWT (https://jwt.io) to establish an MQTT connection.""" | |
global jwt_exp_mins | |
token = { | |
'iat': datetime.datetime.utcnow(), | |
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=jwt_exp_mins), | |
'aud': project_id | |
} | |
with open(private_key_file, 'r') as f: | |
private_key = f.read() | |
print(f"Creating JWT using ES256 from private key file {private_key_file}") | |
return jwt.encode(token, private_key, algorithm="ES256") | |
def error_str(rc): | |
"""Convert a Paho error to a human readable string.""" | |
return f"{rc}: {mqtt.error_string(rc)}" | |
def get_iot_device_client(project_id, region, registry_id, device_id, private_key_path, root_cert_path): | |
"""Establishes an IoT Core-connected client""" | |
device = Device() | |
client = mqtt.Client(client_id=f"projects/{project_id}/locations/{region}/registries/{registry_id}/devices/{device_id}") | |
client.username_pw_set(username="unused", | |
password=create_jwt(project_id, private_key_path)) | |
client.tls_set(ca_certs=root_cert_path, tls_version=ssl.PROTOCOL_TLSv1_2) | |
client.on_connect = device.on_connect | |
client.on_publish = device.on_publish | |
client.on_disconnect = device.on_disconnect | |
print("Connecting...") | |
client.connect("mqtt.googleapis.com", 8883) | |
print("Connected") | |
return device, client | |
class Device(object): | |
"""Represents the state of a single device.""" | |
def __init__(self): | |
self.connected = False | |
self.temp_c = 0 | |
self.temp_f = 0 | |
self.temperature_file = join(glob("/sys/bus/w1/devices/28-*")[0], "w1_slave") | |
def update_temperature_data(self): | |
"""Update the temperature""" | |
with open(self.temperature_file) as infile: | |
infile.readline() | |
temperature_line = infile.readline() | |
self.temp_c = round(float(temperature_line.split(" ")[-1][2:]) / 1000, 1) | |
self.temp_f = round(self.temp_c * 9 / 5 + 32, 1) | |
def update_temperature_data_simulated(self): | |
"""Update the temperature""" | |
self.temp_c = random.randint(21, 35) | |
self.temp_f = round(self.temp_c * 9 / 5 + 32, 1) | |
def on_connect(self, unused_client, unused_userdata, unused_flags, rc): | |
"""Callback for when a device connects.""" | |
print(f"Connection Result: {error_str(rc)}") | |
self.connected = True | |
def on_disconnect(self, unused_client, unused_userdata, rc): | |
"""Callback for when a device disconnects.""" | |
print(f"Disconnected: {error_str(rc)}") | |
self.connected = False | |
def on_publish(self, unused_client, unused_userdata, unused_mid): | |
"""Callback when the device receives a PUBACK from the MQTT bridge.""" | |
print("Published message acked.") | |
def stream_temperature_values(wait_interval, project_id, region, registry_id, device_id, private_key_path, root_cert_path): | |
"""Stream temperature values to the GCP IoT platform""" | |
global jwt_exp_mins | |
device, client = get_iot_device_client(project_id, region, registry_id, device_id, private_key_path, root_cert_path) | |
# This is the topic that the device will publish telemetry events (temperature data) to. | |
mqtt_telemetry_topic = f"/devices/{device_id}/events/temperature" | |
# Wait for the device to connect | |
# device.wait_for_connection(5) | |
jwt_iat = datetime.datetime.utcnow() | |
# Update and publish temperature readings every 5 seconds | |
time.sleep(wait_interval - time.time() % wait_interval) # Sync program to start running every 'wait_interval' seconds based on system clock | |
while True: | |
client.loop() | |
time_point = int(time.time()) | |
device.update_temperature_data() | |
# device.update_temperature_data_simulated() # Uncomment, then comment out the previous line if you want to publish simulated data | |
payload = json.dumps({ | |
"timestamp_utc": str(datetime.datetime.utcfromtimestamp(time_point)), | |
"timestamp_epoch": time_point, | |
"temp_f": device.temp_f, | |
"temp_c": device.temp_c, | |
"device_id": device_id | |
}) | |
print(f"Publishing payload to topic {mqtt_telemetry_topic}: {payload}") | |
client.publish(mqtt_telemetry_topic, payload, qos=1) | |
minutes_since_issue = int((datetime.datetime.utcnow() - jwt_iat).seconds / 60) | |
if minutes_since_issue >= jwt_exp_mins: | |
print(f"Refreshing token after {minutes_since_issue}m") | |
jwt_iat = datetime.datetime.utcnow() | |
device, client = get_iot_device_client(project_id, region, registry_id, device_id, private_key_path, root_cert_path) | |
time.sleep(wait_interval - time.time() % wait_interval) # Capture temperature every 'wait_interval' seconds, in sync with system clock | |
def main(): | |
"""Parse arguments and set global variables""" | |
# Parse arguments for GCP IoT connection configuration values | |
parser = argparse.ArgumentParser(description="Connect and register a new device to the GCP IoT platform") | |
parser.add_argument("-c", "--config", | |
type=str, | |
default="gcp_iot_config.txt", | |
metavar="ConnectionConfigFile", | |
help="File defining the path where cert files are stored, cert filenames, and an IoT endpoint") | |
parser.add_argument("-w", "--wait_interval", | |
type=float, | |
default=1.0, | |
metavar="PublishWaitTime", | |
help="Time in seconds to wait before publishing a new temperature value") | |
args = parser.parse_args() | |
# Parse connection configuration values and establish MQTT client connection | |
config = configparser.ConfigParser() | |
config.read(args.config) | |
device_id = config["SETTINGS"]["DEVICE_ID"] | |
registry_id = config["SETTINGS"]["REGISTRY_ID"] | |
region = config["SETTINGS"]["REGION"] | |
project_id = config["SETTINGS"]["PROJECT_ID"] | |
private_key_path = join(config["SETTINGS"]["KEY_PATH"], config["SETTINGS"]["PRIVATE_KEY"]) | |
root_cert_path = join(config["SETTINGS"]["KEY_PATH"], config["SETTINGS"]["GOOGLE_ROOT_CERT"]) | |
stream_temperature_values(args.wait_interval, project_id, region, registry_id, device_id, private_key_path, root_cert_path) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment