Skip to content

Instantly share code, notes, and snippets.

@Magalex2x14
Last active November 6, 2019 09:02
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Magalex2x14/7389b840d873824dcabf93a193fe2ca8 to your computer and use it in GitHub Desktop.
Save Magalex2x14/7389b840d873824dcabf93a193fe2ca8 to your computer and use it in GitHub Desktop.
averaging (60s) mitemp_bt without polling
"""Platform for sensor integration."""
from homeassistant.const import DEVICE_CLASS_TEMPERATURE, DEVICE_CLASS_HUMIDITY, TEMP_CELSIUS, ATTR_BATTERY_LEVEL
from homeassistant.helpers.entity import Entity
import homeassistant.util.dt as dt_util
from homeassistant.helpers.event import track_point_in_utc_time
import logging
import functools
import itertools
import re
import platform
import signal
import sys
import time
import threading
import subprocess
import pexpect
from uuid import UUID
from contextlib import contextmanager
from datetime import timedelta
import tempfile
import statistics
_LOGGER = logging.getLogger(__name__)
# ----------------------
# SOME OPTIONS TO ADJUST
# ----------------------
CONF_MITEMPBT_ROUNDING = True # enable/disable rounding of the average of all measurements taken within CONF_MITEMPBT_PERIOD seconds
CONF_MITEMPBT_DECIMALS = 2 # to how many decimal places to round if rounding is enabled
CONF_MITEMPBT_PERIOD = 60 # the period in seconds during which the sensor readings are collected and transmitted to HA after averaging
#
CONF_MITEMPBT_TMIN = -9.9 # sensor measurement limits to exclude erroneous spikes from the results
CONF_MITEMPBT_TMAX = 60.0 #
CONF_MITEMPBT_HMIN = 0.0 #
CONF_MITEMPBT_HMAX = 99.9 #
CONF_MITEMPBT_LOG_SPIKES = False # put information about each erroneous spike in the HA log
CONF_MITEMPBT_USE_MEDIAN = False # use median as sensor output instead of mean (helps with "spiky" sensors).
# please note that both the median and the average in any case are present as the sensor state attributes.
# ----------------------
def parseRawData(data):
result = {}
adv_index = data.find("020106")
if adv_index == -1:
return None
payload_length = (len(data) - adv_index - 18*2)/2
if payload_length < 4:
return None
mac_reversed = data[adv_index+24:adv_index+36]
source_mac_reversed = data[adv_index-14:adv_index-2]
if mac_reversed != source_mac_reversed:
return None
mac = mac_reversed[10:12]+mac_reversed[8:10]+mac_reversed[6:8]+mac_reversed[4 :6 ]+mac_reversed[2:4 ]+mac_reversed[0:2]
packetId = int(data[adv_index+22:adv_index+24], 16)
type_start = adv_index + 36
type = data[type_start:type_start+2]
length = data[type_start+4:type_start+6]
if type == "0D" and length == "04" and (payload_length == 8 or payload_length == 12):
temperature_hex = data[type_start+6:type_start+10]
humidity_hex = data[type_start+10:type_start+14]
temperature = ((int(temperature_hex[2:4], 16) << 8) + int(temperature_hex[0:2], 16))/10
humidity = ((int(humidity_hex[2:4], 16) << 8) + int(humidity_hex[0:2], 16))/10
result = { "temperature" : temperature, "humidity" : humidity, "mac": mac, "packet": packetId }
if payload_length == 12:
result["battery"] = int(data[type_start+20:type_start+22],16)
if type == "04" and length == "02" and (payload_length == 6 or payload_length == 10):
temperature_hex = data[type_start+6:type_start+10]
temperature = ((int(temperature_hex[2:4], 16) << 8) + int(temperature_hex[0:2], 16))/10
result = { "temperature" : temperature, "mac": mac, "packet": packetId }
if payload_length == 10:
result["battery"] = int(data[type_start+16:type_start+18],16)
if type == "06" and length == "02" and (payload_length == 6 or payload_length == 10):
humidity_hex = data[type_start+6:type_start+10]
humidity = ((int(humidity_hex[2:4], 16) << 8) + int(humidity_hex[0:2], 16))/10
result = { "humidity" : humidity, "mac": mac, "packet": packetId }
if payload_length == 10:
result["battery"] = int(data[type_start+16:type_start+18],16)
if type == "0A" and length == "01" and payload_length == 5:
battery = int(data[type_start+6:type_start+8],16)
result = { "battery" : battery, "mac": mac, "packet": packetId }
return result
class BLEScanner:
hcitool = None
hcidump = None
tempf = tempfile.TemporaryFile()
_LOGGER.debug('Temp dir used: {}'.format(tempfile.gettempdir()))
def start(self):
_LOGGER.debug('Start receiving broadcasts')
DEVNULL = subprocess.DEVNULL if sys.version_info > (3, 0) else open(os.devnull, 'wb')
# sudo setcap 'cap_net_raw+ep' `readlink -f \`which hcidump\``
self.hcitool = subprocess.Popen(['hcitool', 'lescan', '--duplicates'], stdout = DEVNULL, stderr = DEVNULL)
self.hcidump = subprocess.Popen(['hcidump', '--raw', 'hci'], stdout=self.tempf, stderr=None)
def stop(self):
_LOGGER.debug('Stop receiving broadcasts')
self.hcitool.kill()
self.hcidump.kill()
def get_lines(self):
data = None
try:
_LOGGER.debug("reading hcidump...")
self.tempf.seek(0)
for line in self.tempf.readlines():
line = line.decode()
#_LOGGER.debug(line)
if line.startswith('> '):
yield data
data = line[2:].strip().replace(' ', '')
elif line.startswith('< '):
data = None
else:
if data:
data += line.strip().replace(' ', '')
self.tempf.seek(0)
self.tempf.truncate(0)
except Exception as ex:
_LOGGER.debug(ex)
return []
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the sensor platform."""
_LOGGER.debug("Starting")
scanner = BLEScanner()
scanner.start()
sensors_by_mac = {}
def discover_ble_devices():
"""Discover Bluetooth LE devices."""
_LOGGER.debug("Discovering Bluetooth LE devices")
try:
_LOGGER.debug("Stopping")
scanner.stop()
_LOGGER.debug("Analyzing")
hum_m_data = {}
temp_m_data = {}
batt = {} # battery
lpacket = {} # last packet number
macs = {} # all found macs
for line in scanner.get_lines():
if line:
#_LOGGER.debug("Line: {}".format(line))
data = parseRawData(line)
if data and "mac" in data:
_LOGGER.debug("Parsed: {}".format(data))
# store found readings per device
if "temperature" in data:
if CONF_MITEMPBT_TMAX >= data["temperature"] >= CONF_MITEMPBT_TMIN:
if data["mac"] not in temp_m_data:
temp_m_data[data["mac"]] = []
temp_m_data[data["mac"]].append(data["temperature"])
macs[data["mac"]] = data["mac"]
elif CONF_MITEMPBT_LOG_SPIKES:
_LOGGER.error("Temperature spike: {}".format(data["temperature"]))
if "humidity" in data:
if CONF_MITEMPBT_HMAX >= data["humidity"] >= CONF_MITEMPBT_HMIN:
if data["mac"] not in hum_m_data:
hum_m_data[data["mac"]] = []
hum_m_data[data["mac"]].append(data["humidity"])
macs[data["mac"]] = data["mac"]
elif CONF_MITEMPBT_LOG_SPIKES:
_LOGGER.error("Humidity spike: {}".format(data["humidity"]))
if "battery" in data:
batt[data["mac"]] = int(data["battery"])
macs[data["mac"]] = data["mac"]
lpacket[data["mac"]] = int(data["packet"])
# for every seen device
for mac in macs:
if mac in sensors_by_mac:
sensors = sensors_by_mac[mac]
else:
sensors = [TemperatureSensor(mac), HumiditySensor(mac)]
sensors_by_mac[mac] = sensors
add_entities(sensors)
sensors[0]._device_state_attributes["last packet id"] = lpacket[mac]
sensors[1]._device_state_attributes["last packet id"] = lpacket[mac]
if mac in batt:
sensors[0]._device_state_attributes[ATTR_BATTERY_LEVEL] = batt[mac]
sensors[1]._device_state_attributes[ATTR_BATTERY_LEVEL] = batt[mac]
# averaging and states updating
tempstate_mean = None
humstate_mean = None
tempstate_median = None
humstate_median = None
if CONF_MITEMPBT_USE_MEDIAN:
textattr = "last median of"
else:
textattr = "last mean of"
if mac in temp_m_data:
try:
if CONF_MITEMPBT_ROUNDING:
tempstate_median = round(statistics.median(temp_m_data[mac]), CONF_MITEMPBT_DECIMALS)
tempstate_mean = round(statistics.mean(temp_m_data[mac]), CONF_MITEMPBT_DECIMALS)
else:
tempstate_median = statistics.median(temp_m_data[mac])
tempstate_mean = statistics.mean(temp_m_data[mac])
if CONF_MITEMPBT_USE_MEDIAN:
sensors[0]._state = tempstate_median
else:
sensors[0]._state = tempstate_mean
sensors[0]._device_state_attributes[textattr] = len(temp_m_data[mac])
sensors[0]._device_state_attributes["median"] = tempstate_median
sensors[0]._device_state_attributes["mean"] = tempstate_mean
sensors[0].async_schedule_update_ha_state()
except AttributeError:
_LOGGER.info("Sensor {} not yet ready for update".format(mac))
except ZeroDivisionError:
_LOGGER.error("mitemp_bt: Division by zero while temperature averaging!")
continue
if mac in hum_m_data:
try:
if CONF_MITEMPBT_ROUNDING:
humstate_median = round(statistics.median(hum_m_data[mac]), CONF_MITEMPBT_DECIMALS)
humstate_mean = round(statistics.mean(hum_m_data[mac]), CONF_MITEMPBT_DECIMALS)
else:
humstate_median = statistics.median(hum_m_data[mac])
humstate_mean = statistics.mean(hum_m_data[mac])
if CONF_MITEMPBT_USE_MEDIAN:
sensors[1]._state = humstate_median
else:
sensors[1]._state = humstate_mean
sensors[1]._device_state_attributes[textattr] = len(hum_m_data[mac])
sensors[1]._device_state_attributes["median"] = humstate_median
sensors[1]._device_state_attributes["mean"] = humstate_mean
sensors[1].async_schedule_update_ha_state()
except AttributeError:
_LOGGER.info("Sensor {} not yet ready for update".format(mac))
except ZeroDivisionError:
_LOGGER.error("mitemp_bt: Division by zero while humidity averaging!")
continue
scanner.start()
except RuntimeError as error:
_LOGGER.error("Error during Bluetooth LE scan: %s", error)
return {}
return []
def update_ble(now):
_LOGGER.debug("update_ble called")
"""Lookup Bluetooth LE devices and update status."""
devs = discover_ble_devices()
track_point_in_utc_time(hass, update_ble, dt_util.utcnow() + timedelta(seconds=CONF_MITEMPBT_PERIOD))
update_ble(dt_util.utcnow())
class TemperatureSensor(Entity):
"""Representation of a Sensor."""
def __init__(self, mac):
"""Initialize the sensor."""
self._state = None
self._battery = None
self._unique_id = "t_"+mac
self._device_state_attributes = {}
@property
def name(self):
"""Return the name of the sensor."""
return 'mi {}'.format(self._unique_id)
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return TEMP_CELSIUS
@property
def device_class(self):
"""Return the unit of measurement."""
return DEVICE_CLASS_TEMPERATURE
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def device_state_attributes(self):
"""Return the state attributes."""
return self._device_state_attributes
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self._unique_id
@property
def force_update(self):
"""Force update."""
return True
class HumiditySensor(Entity):
"""Representation of a Sensor."""
def __init__(self, mac):
"""Initialize the sensor."""
self._state = None
self._battery = None
self._unique_id = "h_"+mac
self._device_state_attributes = {}
@property
def name(self):
"""Return the name of the sensor."""
return 'mi {}'.format(self._unique_id)
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return "%"
@property
def device_class(self):
"""Return the unit of measurement."""
return DEVICE_CLASS_HUMIDITY
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def device_state_attributes(self):
"""Return the state attributes."""
return self._device_state_attributes
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self._unique_id
@property
def force_update(self):
"""Force update."""
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment