-
-
Save Magalex2x14/7389b840d873824dcabf93a193fe2ca8 to your computer and use it in GitHub Desktop.
averaging (60s) mitemp_bt without polling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Platform for sensor integration.""" | |
from homeassistant.const import DEVICE_CLASS_TEMPERATURE, DEVICE_CLASS_HUMIDITY, TEMP_CELSIUS, ATTR_BATTERY_LEVEL | |
from homeassistant.helpers.entity import Entity | |
import homeassistant.util.dt as dt_util | |
from homeassistant.helpers.event import track_point_in_utc_time | |
import logging | |
import functools | |
import itertools | |
import re | |
import platform | |
import signal | |
import sys | |
import time | |
import threading | |
import subprocess | |
import pexpect | |
from uuid import UUID | |
from contextlib import contextmanager | |
from datetime import timedelta | |
import tempfile | |
import statistics | |
_LOGGER = logging.getLogger(__name__) | |
# ---------------------- | |
# SOME OPTIONS TO ADJUST | |
# ---------------------- | |
CONF_MITEMPBT_ROUNDING = True # enable/disable rounding of the average of all measurements taken within CONF_MITEMPBT_PERIOD seconds | |
CONF_MITEMPBT_DECIMALS = 2 # to how many decimal places to round if rounding is enabled | |
CONF_MITEMPBT_PERIOD = 60 # the period in seconds during which the sensor readings are collected and transmitted to HA after averaging | |
# | |
CONF_MITEMPBT_TMIN = -9.9 # sensor measurement limits to exclude erroneous spikes from the results | |
CONF_MITEMPBT_TMAX = 60.0 # | |
CONF_MITEMPBT_HMIN = 0.0 # | |
CONF_MITEMPBT_HMAX = 99.9 # | |
CONF_MITEMPBT_LOG_SPIKES = False # put information about each erroneous spike in the HA log | |
CONF_MITEMPBT_USE_MEDIAN = False # use median as sensor output instead of mean (helps with "spiky" sensors). | |
# please note that both the median and the average in any case are present as the sensor state attributes. | |
# ---------------------- | |
def parseRawData(data): | |
result = {} | |
adv_index = data.find("020106") | |
if adv_index == -1: | |
return None | |
payload_length = (len(data) - adv_index - 18*2)/2 | |
if payload_length < 4: | |
return None | |
mac_reversed = data[adv_index+24:adv_index+36] | |
source_mac_reversed = data[adv_index-14:adv_index-2] | |
if mac_reversed != source_mac_reversed: | |
return None | |
mac = mac_reversed[10:12]+mac_reversed[8:10]+mac_reversed[6:8]+mac_reversed[4 :6 ]+mac_reversed[2:4 ]+mac_reversed[0:2] | |
packetId = int(data[adv_index+22:adv_index+24], 16) | |
type_start = adv_index + 36 | |
type = data[type_start:type_start+2] | |
length = data[type_start+4:type_start+6] | |
if type == "0D" and length == "04" and (payload_length == 8 or payload_length == 12): | |
temperature_hex = data[type_start+6:type_start+10] | |
humidity_hex = data[type_start+10:type_start+14] | |
temperature = ((int(temperature_hex[2:4], 16) << 8) + int(temperature_hex[0:2], 16))/10 | |
humidity = ((int(humidity_hex[2:4], 16) << 8) + int(humidity_hex[0:2], 16))/10 | |
result = { "temperature" : temperature, "humidity" : humidity, "mac": mac, "packet": packetId } | |
if payload_length == 12: | |
result["battery"] = int(data[type_start+20:type_start+22],16) | |
if type == "04" and length == "02" and (payload_length == 6 or payload_length == 10): | |
temperature_hex = data[type_start+6:type_start+10] | |
temperature = ((int(temperature_hex[2:4], 16) << 8) + int(temperature_hex[0:2], 16))/10 | |
result = { "temperature" : temperature, "mac": mac, "packet": packetId } | |
if payload_length == 10: | |
result["battery"] = int(data[type_start+16:type_start+18],16) | |
if type == "06" and length == "02" and (payload_length == 6 or payload_length == 10): | |
humidity_hex = data[type_start+6:type_start+10] | |
humidity = ((int(humidity_hex[2:4], 16) << 8) + int(humidity_hex[0:2], 16))/10 | |
result = { "humidity" : humidity, "mac": mac, "packet": packetId } | |
if payload_length == 10: | |
result["battery"] = int(data[type_start+16:type_start+18],16) | |
if type == "0A" and length == "01" and payload_length == 5: | |
battery = int(data[type_start+6:type_start+8],16) | |
result = { "battery" : battery, "mac": mac, "packet": packetId } | |
return result | |
class BLEScanner: | |
hcitool = None | |
hcidump = None | |
tempf = tempfile.TemporaryFile() | |
_LOGGER.debug('Temp dir used: {}'.format(tempfile.gettempdir())) | |
def start(self): | |
_LOGGER.debug('Start receiving broadcasts') | |
DEVNULL = subprocess.DEVNULL if sys.version_info > (3, 0) else open(os.devnull, 'wb') | |
# sudo setcap 'cap_net_raw+ep' `readlink -f \`which hcidump\`` | |
self.hcitool = subprocess.Popen(['hcitool', 'lescan', '--duplicates'], stdout = DEVNULL, stderr = DEVNULL) | |
self.hcidump = subprocess.Popen(['hcidump', '--raw', 'hci'], stdout=self.tempf, stderr=None) | |
def stop(self): | |
_LOGGER.debug('Stop receiving broadcasts') | |
self.hcitool.kill() | |
self.hcidump.kill() | |
def get_lines(self): | |
data = None | |
try: | |
_LOGGER.debug("reading hcidump...") | |
self.tempf.seek(0) | |
for line in self.tempf.readlines(): | |
line = line.decode() | |
#_LOGGER.debug(line) | |
if line.startswith('> '): | |
yield data | |
data = line[2:].strip().replace(' ', '') | |
elif line.startswith('< '): | |
data = None | |
else: | |
if data: | |
data += line.strip().replace(' ', '') | |
self.tempf.seek(0) | |
self.tempf.truncate(0) | |
except Exception as ex: | |
_LOGGER.debug(ex) | |
return [] | |
def setup_platform(hass, config, add_entities, discovery_info=None): | |
"""Set up the sensor platform.""" | |
_LOGGER.debug("Starting") | |
scanner = BLEScanner() | |
scanner.start() | |
sensors_by_mac = {} | |
def discover_ble_devices(): | |
"""Discover Bluetooth LE devices.""" | |
_LOGGER.debug("Discovering Bluetooth LE devices") | |
try: | |
_LOGGER.debug("Stopping") | |
scanner.stop() | |
_LOGGER.debug("Analyzing") | |
hum_m_data = {} | |
temp_m_data = {} | |
batt = {} # battery | |
lpacket = {} # last packet number | |
macs = {} # all found macs | |
for line in scanner.get_lines(): | |
if line: | |
#_LOGGER.debug("Line: {}".format(line)) | |
data = parseRawData(line) | |
if data and "mac" in data: | |
_LOGGER.debug("Parsed: {}".format(data)) | |
# store found readings per device | |
if "temperature" in data: | |
if CONF_MITEMPBT_TMAX >= data["temperature"] >= CONF_MITEMPBT_TMIN: | |
if data["mac"] not in temp_m_data: | |
temp_m_data[data["mac"]] = [] | |
temp_m_data[data["mac"]].append(data["temperature"]) | |
macs[data["mac"]] = data["mac"] | |
elif CONF_MITEMPBT_LOG_SPIKES: | |
_LOGGER.error("Temperature spike: {}".format(data["temperature"])) | |
if "humidity" in data: | |
if CONF_MITEMPBT_HMAX >= data["humidity"] >= CONF_MITEMPBT_HMIN: | |
if data["mac"] not in hum_m_data: | |
hum_m_data[data["mac"]] = [] | |
hum_m_data[data["mac"]].append(data["humidity"]) | |
macs[data["mac"]] = data["mac"] | |
elif CONF_MITEMPBT_LOG_SPIKES: | |
_LOGGER.error("Humidity spike: {}".format(data["humidity"])) | |
if "battery" in data: | |
batt[data["mac"]] = int(data["battery"]) | |
macs[data["mac"]] = data["mac"] | |
lpacket[data["mac"]] = int(data["packet"]) | |
# for every seen device | |
for mac in macs: | |
if mac in sensors_by_mac: | |
sensors = sensors_by_mac[mac] | |
else: | |
sensors = [TemperatureSensor(mac), HumiditySensor(mac)] | |
sensors_by_mac[mac] = sensors | |
add_entities(sensors) | |
sensors[0]._device_state_attributes["last packet id"] = lpacket[mac] | |
sensors[1]._device_state_attributes["last packet id"] = lpacket[mac] | |
if mac in batt: | |
sensors[0]._device_state_attributes[ATTR_BATTERY_LEVEL] = batt[mac] | |
sensors[1]._device_state_attributes[ATTR_BATTERY_LEVEL] = batt[mac] | |
# averaging and states updating | |
tempstate_mean = None | |
humstate_mean = None | |
tempstate_median = None | |
humstate_median = None | |
if CONF_MITEMPBT_USE_MEDIAN: | |
textattr = "last median of" | |
else: | |
textattr = "last mean of" | |
if mac in temp_m_data: | |
try: | |
if CONF_MITEMPBT_ROUNDING: | |
tempstate_median = round(statistics.median(temp_m_data[mac]), CONF_MITEMPBT_DECIMALS) | |
tempstate_mean = round(statistics.mean(temp_m_data[mac]), CONF_MITEMPBT_DECIMALS) | |
else: | |
tempstate_median = statistics.median(temp_m_data[mac]) | |
tempstate_mean = statistics.mean(temp_m_data[mac]) | |
if CONF_MITEMPBT_USE_MEDIAN: | |
sensors[0]._state = tempstate_median | |
else: | |
sensors[0]._state = tempstate_mean | |
sensors[0]._device_state_attributes[textattr] = len(temp_m_data[mac]) | |
sensors[0]._device_state_attributes["median"] = tempstate_median | |
sensors[0]._device_state_attributes["mean"] = tempstate_mean | |
sensors[0].async_schedule_update_ha_state() | |
except AttributeError: | |
_LOGGER.info("Sensor {} not yet ready for update".format(mac)) | |
except ZeroDivisionError: | |
_LOGGER.error("mitemp_bt: Division by zero while temperature averaging!") | |
continue | |
if mac in hum_m_data: | |
try: | |
if CONF_MITEMPBT_ROUNDING: | |
humstate_median = round(statistics.median(hum_m_data[mac]), CONF_MITEMPBT_DECIMALS) | |
humstate_mean = round(statistics.mean(hum_m_data[mac]), CONF_MITEMPBT_DECIMALS) | |
else: | |
humstate_median = statistics.median(hum_m_data[mac]) | |
humstate_mean = statistics.mean(hum_m_data[mac]) | |
if CONF_MITEMPBT_USE_MEDIAN: | |
sensors[1]._state = humstate_median | |
else: | |
sensors[1]._state = humstate_mean | |
sensors[1]._device_state_attributes[textattr] = len(hum_m_data[mac]) | |
sensors[1]._device_state_attributes["median"] = humstate_median | |
sensors[1]._device_state_attributes["mean"] = humstate_mean | |
sensors[1].async_schedule_update_ha_state() | |
except AttributeError: | |
_LOGGER.info("Sensor {} not yet ready for update".format(mac)) | |
except ZeroDivisionError: | |
_LOGGER.error("mitemp_bt: Division by zero while humidity averaging!") | |
continue | |
scanner.start() | |
except RuntimeError as error: | |
_LOGGER.error("Error during Bluetooth LE scan: %s", error) | |
return {} | |
return [] | |
def update_ble(now): | |
_LOGGER.debug("update_ble called") | |
"""Lookup Bluetooth LE devices and update status.""" | |
devs = discover_ble_devices() | |
track_point_in_utc_time(hass, update_ble, dt_util.utcnow() + timedelta(seconds=CONF_MITEMPBT_PERIOD)) | |
update_ble(dt_util.utcnow()) | |
class TemperatureSensor(Entity): | |
"""Representation of a Sensor.""" | |
def __init__(self, mac): | |
"""Initialize the sensor.""" | |
self._state = None | |
self._battery = None | |
self._unique_id = "t_"+mac | |
self._device_state_attributes = {} | |
@property | |
def name(self): | |
"""Return the name of the sensor.""" | |
return 'mi {}'.format(self._unique_id) | |
@property | |
def state(self): | |
"""Return the state of the sensor.""" | |
return self._state | |
@property | |
def unit_of_measurement(self): | |
"""Return the unit of measurement.""" | |
return TEMP_CELSIUS | |
@property | |
def device_class(self): | |
"""Return the unit of measurement.""" | |
return DEVICE_CLASS_TEMPERATURE | |
@property | |
def should_poll(self): | |
"""No polling needed.""" | |
return False | |
@property | |
def device_state_attributes(self): | |
"""Return the state attributes.""" | |
return self._device_state_attributes | |
@property | |
def unique_id(self) -> str: | |
"""Return a unique ID.""" | |
return self._unique_id | |
@property | |
def force_update(self): | |
"""Force update.""" | |
return True | |
class HumiditySensor(Entity): | |
"""Representation of a Sensor.""" | |
def __init__(self, mac): | |
"""Initialize the sensor.""" | |
self._state = None | |
self._battery = None | |
self._unique_id = "h_"+mac | |
self._device_state_attributes = {} | |
@property | |
def name(self): | |
"""Return the name of the sensor.""" | |
return 'mi {}'.format(self._unique_id) | |
@property | |
def state(self): | |
"""Return the state of the sensor.""" | |
return self._state | |
@property | |
def unit_of_measurement(self): | |
"""Return the unit of measurement.""" | |
return "%" | |
@property | |
def device_class(self): | |
"""Return the unit of measurement.""" | |
return DEVICE_CLASS_HUMIDITY | |
@property | |
def should_poll(self): | |
"""No polling needed.""" | |
return False | |
@property | |
def device_state_attributes(self): | |
"""Return the state attributes.""" | |
return self._device_state_attributes | |
@property | |
def unique_id(self) -> str: | |
"""Return a unique ID.""" | |
return self._unique_id | |
@property | |
def force_update(self): | |
"""Force update.""" | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment