Skip to content

Instantly share code, notes, and snippets.

@amitkeret
Last active May 2, 2022 12:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amitkeret/c756148fe4679fac6eaf39c590ac9e82 to your computer and use it in GitHub Desktop.
Save amitkeret/c756148fe4679fac6eaf39c590ac9e82 to your computer and use it in GitHub Desktop.
Check peripheral connectivity to Raspberry Pi and report via MQTT
#!/usr/bin/python
# -*- coding: utf-8 -*-
mqttuser = "*****"
mqttpwd = "*****"
mqtthost = "*****"
mqttport = 1883
mqttkeepalive = 60
piname = "*****"
import sys, os, re, subprocess, json, time, datetime, hashlib
import paho.mqtt.client as paho
script_dir = os.path.dirname(__file__)
# Power state
output = subprocess.check_output("echo 'pow 0' | cec-client -s | grep 'power status:'", shell=True)
power = output.strip().replace('power status: ', '')
# Unsure why this happens; but ignore this run and exit
if power == "unknown":
exit()
# Connected BT devices
bt = subprocess.check_output(['hcitool', 'con']).split()
btdevices = [x for x in bt if re.match('(\w{2}:){5}\w{2}', x)]
btdevices_count = len(btdevices)
btdevices_state = "on" if btdevices_count > 0 else "off"
# Serializing and comparing the current state to the last time we checked
# No need to continue if no state has changed
hash = hashlib.md5(json.dumps([power, btdevices]).encode()).hexdigest()
with open(os.path.join(script_dir, "periph_mqtt_state.md5"), "r") as file:
last_hash = file.read()
if hash == last_hash:
exit()
# Last message (UTC, ISO)
lastmsg = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
# Save the current state
with open(os.path.join(script_dir, "periph_mqtt_state.md5"), "w") as file:
file.write(hash)
# Log this run's timestamp
with open(os.path.join(script_dir, "periph_mqtt.log"), "a") as file:
file.write(lastmsg + " " + json.dumps([power, btdevices]) + "\n")
# Setup state topic status
sensors = {
"power": power,
"bt_devices": btdevices_state
}
# Setup attributes topic status
attributes = {
"power": {},
"bt_devices": {
"devices_count": btdevices_count,
"devices": btdevices}
}
for item in attributes:
attributes[item]["last_message"] = lastmsg
# Setup Home Assistant auto-discovery
ha_sensors = {
"power": {
"name": "Power",
"device_class": "power",
"payload_on": "on",
"payload_off": "standby"},
"bt_devices": {
"name": "Connected BT devices",
"icon": "mdi:bluetooth" + ("-off" if btdevices_count == 0 else ""),
"payload_on": "on",
"payload_off": "off",
"device_class": "connectivity"}
}
pinamelc = piname.lower()
topic = "system-sensors/binary_sensor/%s" % (pinamelc)
for index, sensor in ha_sensors.items():
ha_sensors[index]["name"] = "%s %s" % (piname, sensor["name"])
ha_sensors[index]["unique_id"] = "%s_binary_sensor_%s" % (pinamelc, index)
ha_sensors[index]["state_topic"] = "%s/state" % (topic)
ha_sensors[index]["value_template"] = "{{ value_json.%s }}" % (index)
ha_sensors[index]["json_attributes_topic"] = "%s/attr" % (topic)
ha_sensors[index]["json_attributes_template"] = "{{ value_json.%s | tojson }}" % (index)
# Start MQTT connection
client = paho.Client()
client.username_pw_set(mqttuser, mqttpwd)
client.connect(mqtthost, mqttport, mqttkeepalive)
client.will_set("%s/availability" % (topic), "offline", 1, True)
# Publish states
client.publish("%s/state" % (topic), json.dumps(sensors), 1, False)
client.publish("%s/attr" % (topic), json.dumps(attributes), 1, False)
# A different approach; publish each sensor in its own topic
#for index, sensor in sensors.items():
# client.publish("%s/%s" % (topic, index), sensor, 1, True)
# client.publish("%s/%s/attr" % (topic, index), json.dumps(attributes[index]), 1, True)
# Publish Home Assistant auto-discovery config
for index, sensor in ha_sensors.items():
client.publish("homeassistant/binary_sensor/%s/%s/config" % (pinamelc, index), json.dumps(sensor), 1, True)
client.publish("%s/availability" % (topic), "online", 1, True)
time.sleep(1)
client.disconnect()
sys.exit()
[Unit]
Description=Continuous loop reporting state of peripherals.
[Service]
Type=simple
ExecStart=/bin/bash /scripts/periph_mqtt.sh
[Install]
WantedBy=multi-user.target
#!/bin/bash
while true
do
echo "Running..."
python /scripts/periph_mqtt.py
sleep 10
done
@amitkeret
Copy link
Author

crontab doesn't support running faster than once/minute, so using endless loop

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment