-
-
Save puddly/7db5ecc9b4f618e37583922dce2f4b83 to your computer and use it in GitHub Desktop.
Minimal application for Xiaomi Zigbee communication with bellows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import asyncio | |
import bellows.ezsp | |
import bellows.types | |
import bellows.zigbee.profiles | |
from bellows.zigbee.zcl import foundation, clusters | |
from bellows.zigbee.application import ControllerApplication | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger() | |
HEARTBEAT_ATTRIBUTE_ID = 0xFF01 | |
def decode_xiaomi_heartbeat(data): | |
from bellows.zigbee.zcl import foundation | |
heartbeat = {} | |
unknown = {} | |
while data: | |
xiaomi_type = data[0] | |
value, data = foundation.TypeValue.deserialize(data[1:]) | |
value = value.value | |
if xiaomi_type == 0x01: | |
# I'm sure there's a more accurate way to do this | |
heartbeat['battery_percentage'] = (value - 2600) / (3200 - 2600) | |
heartbeat['battery_voltage'] = value / 1000 | |
elif xiaomi_type == 0x03: | |
heartbeat['soc_temperature'] = value | |
elif xiaomi_type == 0x64: | |
heartbeat['state'] = value | |
else: | |
unknown[xiaomi_type] = value | |
if unknown: | |
heartbeat['unknown'] = unknown | |
return heartbeat | |
class LongTimeoutControllerApplication(ControllerApplication): | |
async def initialize(self): | |
await super().initialize() | |
# 5 * 2^10 = 5,120 seconds | |
await self._cfg(bellows.types.EzspConfigId.CONFIG_END_DEVICE_POLL_TIMEOUT, 5) | |
await self._cfg(bellows.types.EzspConfigId.CONFIG_END_DEVICE_POLL_TIMEOUT_SHIFT, 10) | |
class TestListener: | |
def __init__(self, application): | |
self.application = application | |
def device_initialized(self, device): | |
logger.debug('Device initialized: %r', device) | |
cluster = device.endpoints[1].in_clusters[clusters.general.Basic.cluster_id] | |
cluster.add_listener(self) | |
def device_joined(self, device): | |
logger.debug('Device joined: %r', device) | |
asyncio.ensure_future(self.async_device_joined(device, joined=True)) | |
async def async_device_joined(self, device): | |
logger.debug('Device joined: %r', device) | |
while True: | |
logging.info('Attempting to set up the device...') | |
try: | |
cluster = device.endpoints[1].in_clusters[clusters.general.OnOff.cluster_id] | |
await asyncio.sleep(1) | |
logger.info('Trying to bind to cluster...') | |
bind_response = await cluster.bind() | |
logging.debug('Bind response is: %r', bind_response) | |
await asyncio.sleep(1) | |
logger.info('Trying to configure reporting...') | |
reporting_response = await cluster.configure_reporting(attribute=0, min_interval=60*30, max_interval=60*45, reportable_change=1) | |
logging.info('Configure response is: %r', reporting_response) | |
break | |
except Exception as e: | |
logger.exception('Got an error requesting attribute reporting!') | |
await asyncio.sleep(10) | |
continue | |
def attribute_updated(self, attribute_id, value): | |
if attribute_id != HEARTBEAT_ATTRIBUTE_ID: | |
return | |
logger.debug('Received a heartbeat: %r', decode_xiaomi_heartbeat(value)) | |
def device_left(self, device): | |
logger.debug('Device left: %r', device) | |
def device_removed(self, device): | |
logger.debug('Device removed: %r', device) | |
async def main(): | |
ezsp = bellows.ezsp.EZSP() | |
await ezsp.connect('/dev/ttyUSB1', 57600) | |
application = LongTimeoutControllerApplication(ezsp, '.homeassistant/zigbee.db') | |
listener = TestListener(application) | |
application.add_listener(listener) | |
await asyncio.sleep(1) | |
await application.startup() | |
for device in application.devices.values(): | |
listener.device_initialized(device) | |
logger.debug('Allowing joins for 60 seconds...') | |
await application.permit(60) | |
await asyncio.sleep(60 + 1) | |
logger.debug('Joins are done!') | |
await asyncio.sleep(100000) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment