Skip to content

Instantly share code, notes, and snippets.

@amitkeret
Last active May 2, 2022 12:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amitkeret/7f303957c8fd1bc14751599974a6a4fa to your computer and use it in GitHub Desktop.
Save amitkeret/7f303957c8fd1bc14751599974a6a4fa to your computer and use it in GitHub Desktop.
simplified RPi system sensors MQTT announcements (incl. Home Assistant auto-discovery)
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Simplified RPi system sensors MQTT announcements
# Inspired by the great work done by @Sennevds
#
# This script is meant to be executed via crontab.
# It does NOT maintain a persistent connection and/or employ a loop function to keep sending messages at defined intervals.
# Therefor, in order to make use of the LWT function, added the keepAlive flag to the connecion.
#
# Make sure to make your keepAlive time slightly longer than the crontab interval time!
# MQTT connection info
mqttuser = "****"
mqttpwd = "****"
mqtthost = "****"
mqttport = 1883
mqttkeepalive = 60
# RPi-related info
piname = "****"
disk_path = "/"
#
# (Hopefully) no need to edit past this line -------------------------------------------
#
import sys, os, re, subprocess, socket, json, time, datetime
import paho.mqtt.client as paho
tot_m, used_m, free_m = map(int, os.popen('free -t -m').readlines()[-1].split()[1:])
CPU_Pct = str(round(float(os.popen('''grep 'cpu ' /proc/stat | awk '{usage=($2+$4)*100/($2+$4+$5)} END {print usage }' ''').readline()),2))
# CPU usage
cpu = "%.1f" % float(CPU_Pct)
# Memory usage
memory = "%.1f" % ( float(tot_m - used_m) / tot_m * 100 )
# Temperature
treading = subprocess.check_output(['vcgencmd', 'measure_temp']).decode('UTF-8')
temp = str(re.findall('\d+.\d+', treading)[0])
# Clock speed
creading = subprocess.check_output(['vcgencmd', 'measure_clock','arm']).decode('UTF-8')
clock = str(int(re.findall('\d+', creading)[1]) / 1000000)
# Hostname
host = socket.gethostname()
# Last message (UTC, ISO)
lastmsg = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
# @see https://stackoverflow.com/questions/54458308
def disk_usage(path):
st = os.statvfs(path)
free = st.f_bavail * st.f_frsize
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
return "%.1f" % ( float(used) / total * 100 )
# @see https://github.com/Sennevds/system_sensors/blob/master/src/sensors.py#L164
def get_host_ip():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(('8.8.8.8', 80))
return sock.getsockname()[0]
except socket.error:
try:
return socket.gethostbyname(socket.gethostname())
except socket.gaierror:
return '127.0.0.1'
finally:
sock.close()
pinamelc = piname.lower()
topic = "system-sensors/sensor/%s" % (pinamelc)
sensors = {
"temperature": temp,
"clock_speed": clock,
"memory_use": memory,
"cpu_usage": cpu,
"disk_use": disk_usage(disk_path),
"hostname": host,
"last_message": lastmsg,
"ip": get_host_ip()
}
ha_sensors = {
"temperature": {
"name": "Temperature",
"unit": "°C",
"icon": "thermometer",
"class": "temperature"},
"clock_speed": {
"name": "Clock speed",
"unit": "Hz",
"icon": "speedometer"},
"memory_use": {
"name": "Memory usage",
"unit": "%",
"icon": "memory"},
"cpu_usage": {
"name": "CPU usage",
"unit": "%",
"icon": "cpu-64-bit"},
"disk_use": {
"name": "Disk usage",
"unit": "%",
"icon": "micro-sd"},
"hostname": {
"name": "Hostname",
"icon": "card-account-details"},
"last_message": {
"name": "Last message",
"icon": "clock-check",
"class": "timestamp"},
"ip": {
"name": "Host IP",
"icon": "lan"}
}
def add_device_info(config):
config['device'] = {
"identifiers": ["%s_sensor" % (pinamelc)],
"name": "%s Sensors" % (piname),
"model": "RPI %s" % (piname),
"manufacturer": "RPI"}
return config
def add_availability_topic(config):
config["availability_topic"] = "%s/availability" % (topic)
config["payload_available"] = "online"
config["payload_not_available"] = "offline"
return config
client = paho.Client()
client.username_pw_set(mqttuser, mqttpwd)
client.connect(mqtthost, mqttport, mqttkeepalive)
client.will_set("%s/availability" % (topic), "offline", 1, True)
client.publish("%s/state" % (topic), json.dumps(sensors), 1, False)
for index, sensor in ha_sensors.items():
config = {
"name": "%s %s" % (piname, sensor['name']),
"state_topic": "%s/state" % (topic),
"value_template": "{{ value_json.%s }}" % (index),
"unique_id": "%s_sensor_%s" % (pinamelc, index),
"icon": "mdi:%s" % (sensor['icon'])
}
if "unit" in sensor:
config['unit_of_measurement'] = sensor['unit']
if "class" in sensor:
config['device_class'] = sensor['class']
config = json.dumps(add_device_info(add_availability_topic(config)))
client.publish("homeassistant/sensor/%s/%s/config" % (pinamelc, index), config, 1, True)
client.publish("%s/availability" % (topic), "online", 1, True)
client.publish("homeassistant/binary_sensor/%s/connected/config" % (pinamelc), json.dumps(add_device_info({
"name": "%s Connected" % (sensors['hostname']),
"state_topic": "%s/availability" % (topic),
"value_template": "{{ value }}",
"unique_id": "%s_sensor_connected" % (pinamelc),
"payload_on": "online",
"payload_off": "offline",
"device_class": "connectivity"
})), 1, True)
time.sleep(1)
client.disconnect()
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment