Skip to content

Instantly share code, notes, and snippets.

@puddly

puddly/xiaomi.py Secret

Created December 12, 2017 01:39
Show Gist options
  • Save puddly/7db5ecc9b4f618e37583922dce2f4b83 to your computer and use it in GitHub Desktop.
Save puddly/7db5ecc9b4f618e37583922dce2f4b83 to your computer and use it in GitHub Desktop.
Minimal application for Xiaomi Zigbee communication with bellows
import logging
import asyncio
import bellows.ezsp
import bellows.types
import bellows.zigbee.profiles
from bellows.zigbee.zcl import foundation, clusters
from bellows.zigbee.application import ControllerApplication
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
HEARTBEAT_ATTRIBUTE_ID = 0xFF01
def decode_xiaomi_heartbeat(data):
from bellows.zigbee.zcl import foundation
heartbeat = {}
unknown = {}
while data:
xiaomi_type = data[0]
value, data = foundation.TypeValue.deserialize(data[1:])
value = value.value
if xiaomi_type == 0x01:
# I'm sure there's a more accurate way to do this
heartbeat['battery_percentage'] = (value - 2600) / (3200 - 2600)
heartbeat['battery_voltage'] = value / 1000
elif xiaomi_type == 0x03:
heartbeat['soc_temperature'] = value
elif xiaomi_type == 0x64:
heartbeat['state'] = value
else:
unknown[xiaomi_type] = value
if unknown:
heartbeat['unknown'] = unknown
return heartbeat
class LongTimeoutControllerApplication(ControllerApplication):
async def initialize(self):
await super().initialize()
# 5 * 2^10 = 5,120 seconds
await self._cfg(bellows.types.EzspConfigId.CONFIG_END_DEVICE_POLL_TIMEOUT, 5)
await self._cfg(bellows.types.EzspConfigId.CONFIG_END_DEVICE_POLL_TIMEOUT_SHIFT, 10)
class TestListener:
def __init__(self, application):
self.application = application
def device_initialized(self, device):
logger.debug('Device initialized: %r', device)
cluster = device.endpoints[1].in_clusters[clusters.general.Basic.cluster_id]
cluster.add_listener(self)
def device_joined(self, device):
logger.debug('Device joined: %r', device)
asyncio.ensure_future(self.async_device_joined(device, joined=True))
async def async_device_joined(self, device):
logger.debug('Device joined: %r', device)
while True:
logging.info('Attempting to set up the device...')
try:
cluster = device.endpoints[1].in_clusters[clusters.general.OnOff.cluster_id]
await asyncio.sleep(1)
logger.info('Trying to bind to cluster...')
bind_response = await cluster.bind()
logging.debug('Bind response is: %r', bind_response)
await asyncio.sleep(1)
logger.info('Trying to configure reporting...')
reporting_response = await cluster.configure_reporting(attribute=0, min_interval=60*30, max_interval=60*45, reportable_change=1)
logging.info('Configure response is: %r', reporting_response)
break
except Exception as e:
logger.exception('Got an error requesting attribute reporting!')
await asyncio.sleep(10)
continue
def attribute_updated(self, attribute_id, value):
if attribute_id != HEARTBEAT_ATTRIBUTE_ID:
return
logger.debug('Received a heartbeat: %r', decode_xiaomi_heartbeat(value))
def device_left(self, device):
logger.debug('Device left: %r', device)
def device_removed(self, device):
logger.debug('Device removed: %r', device)
async def main():
ezsp = bellows.ezsp.EZSP()
await ezsp.connect('/dev/ttyUSB1', 57600)
application = LongTimeoutControllerApplication(ezsp, '.homeassistant/zigbee.db')
listener = TestListener(application)
application.add_listener(listener)
await asyncio.sleep(1)
await application.startup()
for device in application.devices.values():
listener.device_initialized(device)
logger.debug('Allowing joins for 60 seconds...')
await application.permit(60)
await asyncio.sleep(60 + 1)
logger.debug('Joins are done!')
await asyncio.sleep(100000)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment