Created
May 7, 2018 02:34
-
-
Save balloob/031461de91ab2b06ff1efb75623aa7d9 to your computer and use it in GitHub Desktop.
Basis for a component to prototype bluetooth discovery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Basis for a BLE discovery component. | |
To add to Home Assistant, copy this file to | |
<config>/custom_components/ble_discovery.py | |
Add to your configuration.yaml: | |
``` | |
ble_discovery: | |
logger: | |
default: info | |
logs: | |
custom_components.ble_discovery: debug | |
``` | |
Lifecycle of component: | |
- Start scanning | |
- Discover device that broadcasts URL with device description | |
- Parse description, find available services | |
- Establish connection with device | |
- Create a new Home Assistant entity for each available service on device | |
- Connection lost with device: remove entities from Home Assistant | |
Left to implement are the parts that interact with BLE (marked with TODO): | |
- Discover GATT devices and retrieve URL with device description | |
- Connect to GATT devices | |
- Read boolean characteristic | |
- Write boolean characteristic | |
All todo parts are currently mocked so you can test this without BLE. | |
""" | |
import logging | |
import voluptuous as vol | |
from homeassistant.const import ( | |
SERVICE_TURN_ON, SERVICE_TURN_OFF, EVENT_HOMEASSISTANT_START, | |
EVENT_HOMEASSISTANT_STOP, ATTR_ENTITY_ID) | |
from homeassistant.helpers.entity_component import EntityComponent | |
from homeassistant.helpers.entity import ToggleEntity | |
from homeassistant.helpers import config_validation as cv | |
DOMAIN = 'ble_discovery' | |
_LOGGER = logging.getLogger(__name__) | |
SERVICE_SCHEMA = vol.Schema({ | |
ATTR_ENTITY_ID: cv.entity_ids | |
}) | |
# MOCK: Fake description used for discovered devices | |
MOCK_DESCRIPTION = { | |
'name': 'Fake device', | |
'services': [ | |
{ | |
'type': 'BinarySwitch', | |
'characteristic_id': 'characteristic_1', | |
'name': 'LED 1' | |
}, | |
{ | |
'type': 'BinarySwitch', | |
'characteristic_id': 'characteristic_2', | |
'name': 'LED 2' | |
}, | |
] | |
} | |
def setup(hass, config): | |
"""Basic ble discovery.""" | |
component = EntityComponent(_LOGGER, DOMAIN, hass) | |
ble_interface = BLEInterface(component) | |
# Register services to turn device on/off | |
def handle_service(service): | |
"""Handle calls to the services.""" | |
for entity_id in service.data[ATTR_ENTITY_ID]: | |
entity = component.get_entity(entity_id) | |
if service.service == SERVICE_TURN_ON: | |
entity.turn_on() | |
else: | |
entity.turn_off() | |
entity.update() | |
entity.schedule_update_ha_state() | |
hass.services.register( | |
DOMAIN, SERVICE_TURN_OFF, handle_service, SERVICE_SCHEMA) | |
hass.services.register( | |
DOMAIN, SERVICE_TURN_ON, handle_service, SERVICE_SCHEMA) | |
# Register to start/stop scanning when Home Assistant starts/stops | |
hass.bus.listen(EVENT_HOMEASSISTANT_START, | |
lambda event: ble_interface.start_scan()) | |
hass.bus.listen(EVENT_HOMEASSISTANT_STOP, | |
lambda event: ble_interface.stop_scan()) | |
# Indicate that the component was setup successfully | |
return True | |
class BLEInterface: | |
"""Stub for a BLE interface.""" | |
def __init__(self, component): | |
"""Initialize the interface.""" | |
self.component = component | |
self.devices = {} | |
# MOCK: keep track of values we are supposed to read/write | |
self._mock_cache = {} | |
def start_scan(self): | |
"""Start the Bluetooth scanning. | |
Discover devices that are broadcasting a specific packet. This packet | |
should contain a URL pointing at a device schema. Pass device ID and | |
URL to self.device_discovered(device_id, url). | |
The URL will be fetched and we will call self.connect(device_id) if we | |
can handle it. | |
""" | |
_LOGGER.debug("Start scanning") | |
# TODO | |
# MOCK: Imitate we discover a device | |
self.device_discovered('mock_device', | |
'http://example.com/device_schema.json') | |
# MOCK: Imitate we lose connection in 10 seconds | |
# from threading import Timer | |
# Timer(10, self.connection_closed, args=['mock_device']).start() | |
def stop_scan(self): | |
"""Stop the bluetooth scanning.""" | |
_LOGGER.debug("Stop scanning") | |
# TODO | |
def connect(self, device_id): | |
"""Initialize a connection with a device. | |
Should call self.connection_established(device_id) when successful. | |
Once connected, should call self.connection_closed(device_id) when | |
connection is lost. | |
""" | |
_LOGGER.debug("Connecting to %s", device_id) | |
# TODO | |
# MOCK: Imitate a successful connection | |
self.connection_established(device_id) | |
def device_discovered(self, device_id, description_url): | |
"""Called from the scanning process when a new device is discovered.""" | |
if device_id in self.devices: | |
_LOGGER.debug("Discovered existing device %s", device_id) | |
return | |
_LOGGER.debug("Discovering new device %s: %s", | |
device_id, description_url) | |
# If we wanted to fetch the url for real | |
# import requests | |
# description = requests.get(description_url).json() | |
# MOCK: Stub description | |
description = MOCK_DESCRIPTION | |
# Store the description for the device and try to connect | |
self.devices[device_id] = { | |
'description': description | |
} | |
self.connect(device_id) | |
def connection_established(self, device_id): | |
"""Called when a connection with a device has been established.""" | |
_LOGGER.debug('Connection established with %s', device_id) | |
entities = [] | |
for service in self.devices[device_id]['description']['services']: | |
if service['type'] == 'BinarySwitch': | |
entities.append(BinarySwitchEntity(self, device_id, service)) | |
# MOCK: Initialize a fake value | |
self._mock_cache[(device_id, service['characteristic_id'])] = False | |
else: | |
_LOGGER.warning('Unknown service type %s', service['type']) | |
self.devices[device_id]['entities'] = entities | |
# Update entities and add to Home Assistant | |
for entity in entities: | |
entity.update() | |
self.component.add_entities(entities) | |
def connection_closed(self, device_id): | |
"""Called when a connection with a device has been closed.""" | |
_LOGGER.debug('Connection closed with %s', device_id) | |
connection = self.devices.pop(device_id, None) | |
if connection is None: | |
return | |
for entity in connection['entities']: | |
self.component.hass.add_job(entity.async_remove) | |
def read_boolean(self, device_id, characteristic_id): | |
"""Read a boolean characteristic from a device.""" | |
_LOGGER.debug('Read boolean %s.%s', device_id, characteristic_id) | |
# TODO | |
# MOCK: Return value from mock_cache | |
return self._mock_cache[(device_id, characteristic_id)] | |
def write_boolean(self, device_id, characteristic_id, value): | |
"""Write a boolean to a characteristic on a device.""" | |
_LOGGER.debug('Write boolean %s to %s.%s', | |
value, device_id, characteristic_id) | |
# TODO | |
# MOCK: Set value in mock_cache | |
self._mock_cache[(device_id, characteristic_id)] = value | |
class BinarySwitchEntity(ToggleEntity): | |
"""Entity that can handle a BinarySwitch service.""" | |
def __init__(self, ble_interface, device_id, service): | |
"""Initialize an entity.""" | |
self._ble_interface = ble_interface | |
self._device_id = device_id | |
self._service = service | |
self._is_on = False | |
@property | |
def name(self): | |
"""Name of the device.""" | |
return self._service['name'] | |
@property | |
def is_on(self): | |
"""Return if device is on.""" | |
return self._is_on | |
@property | |
def should_poll(self): | |
"""If Home Assistant should poll this entity.""" | |
return False | |
def turn_on(self, **kwargs): | |
"""Turn the entity on.""" | |
self._ble_interface.write_boolean( | |
self._device_id, self._service['characteristic_id'], True) | |
def turn_off(self, **kwargs): | |
"""Turn the entity off.""" | |
self._ble_interface.write_boolean( | |
self._device_id, self._service['characteristic_id'], False) | |
@property | |
def device_state_attributes(self): | |
"""Extra state attributes for this device.""" | |
return { | |
'device': self._device_id, | |
'characteristic': self._service['characteristic_id'] | |
} | |
def update(self): | |
"""Read the characteristic.""" | |
self._is_on = self._ble_interface.read_boolean( | |
self._device_id, self._service['characteristic_id']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment